《KAFKA官方文档》设计与实现(一)

5.设计与实现(IMPLEMENTATION)

5.1 API 设计

生产者 APIS

生产者API包含2个producers-kafka.producer.SyncProducer
kafka.producer.async.AsyncProducer。示例代码如下:

class Producer {

/* Sends the data, partitioned by key to the topic using either the */
/* synchronous or the asynchronous producer */
/*使用同步或者异步的生产者,发送单条消息至由key所对应的topic的分区*/
public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);

/* Sends a list of data, partitioned by key to the topic using either */
/* the synchronous or the asynchronous producer */
/*使用同步或者异步的生产者,发送一系列数据至由key所对应的topic的分区*/
public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);

/* Closes the producer and cleans up */
/*关闭生产者并做相应清理*/
public void close();

}

其目的就是通过一个单一的API向客户端暴露所有的生产者功能。kafka生产者

  • 可以处理多个生产者的排队以及缓冲请求以及异步地分发批量的数据:
    kafka.producer.Producer对于多个生产者的请求数据(producer.type=async),在序列化和分发它们至相应的kafka节点分区之前,其有能力对它们进行批量处理。而批量处理的大小可由少量的配置参数完成。当数据进入至队列,它们将被缓冲在队列里面,直到queue.time超时或者达到了配置(batch.size)的批量处理的最大值.后台的异步线程(kafka.producer.async.ProducerSendThread)负责将队列里的数据批量取出并让kafka.producer.EventHandler进行序列化工作,且将数据发送至kafka相应的节点分区。通过设置event.handler配置参数,即可实现一个自定义的事件处理器(event handler)。不论对于植入自定义日志/跟踪代码,还是自定义监控逻辑,能在生产者队列管道的不同阶段注入回调函数是极其有帮助的。一种可能的方案是通过实现kafka.producer.async.CallbackHandler接口并且对该类设置callback.handler配置参数。
  • 通过用户自定义的Encoder实现对数据的序列化操作:

interface Encoder<T> {

public Message toMessage(T data);

}

默认的Encoder``是kafka.serializer.DefaultEncoder“`

  • 通过用户设置(可选)的Partitioner提供基于软件层面的负载均衡(slb):
    kafka.producer.Partitioner会影响到数据传输时的路由策略。

interface Partitioner<T> {

int partition(T key, int numPartitions);

}

分区API使用key以及可用节点分区来返回一个分区id。这个id通常用作有序broker_ids的索引,同时节点分区(partitions)将会用这个id挑选出一个分区去处理生产者的请求。默认的分区策略是是对key进行hash,并对分区数目取余,即hash(key)%numPartitions。如果key为null,那么将会挑选出一个随机的节点。如果想要实现自定义的分区策略,也可以通过设置partitioner.class配置参数实现。

消费者 APIS

kafka提供两种级别的消费者APIS。对于普通、简单的消费者API,其仅包含对单个节点的连接,且可关闭发送给server网络请求。这个API是完全无状态的,每个网络请求将携带偏移量,用户可以根据自己的选择是否保留这些元数据。

高级的消费者API不仅隐藏了kafka集群的细节,而且可以消费集群中的任意一台机器而不用关心其背后的网络拓扑。同时,它也保留了消息是否被消费的状态。另外,高级别的消费者API还支持对依据过滤表达式来对订阅的topic进行过滤(譬如白名单或者黑名单等类似的正则表达式)

普通的 API

class SimpleConsumer {

/*向节点发出拉取消息的请求,并返回消息的数据集*/
public ByteBufferMessageSet fetch(FetchRequest request);

/*发送批量拉取消息的请求,返回响应数据集*/
public MultiFetchResponse multifetch(List<FetchRequest> fetches);

/**
*在给定时间前返回有效的偏移量(分区容量最大值)数据集,且为降序排列
*
* @param time: 毫秒,
* 如果设置了 OffsetRequest$.MODULE$.LATEST_TIME(),则可以从最新的偏移量获取消息
* 如果设置了 OffsetRequest$.MODULE$.EARLIEST_TIME(), 则可以从最早的偏移获取消息.
*/
public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}

普通消费者API通常用于实现高级API,以及用于一些离线消费者,这些消费者对于保持状态有特殊的要求

高级 API

/*创建一个对kafka集群的连接*/
ConsumerConnector connector = Consumer.create(consumerConfig);

interface ConsumerConnector {

/**
* 此方法用于获取KafkaStreams的集合,这个集合是
* MessageAndMetadata对象的迭代器,通过这个对象你可以获取到与元数据(目前仅指topic)相关联的消息
* Input: a map of <topic, #streams>
* Output: a map of <topic, list of message streams>
*/
public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);

/**
* 你可以获取KafkaStreams的集合, 对符合TopicFilter过滤后topic消息进行迭代(TopicFilter是用标准Java正则表达式封装的topic白名单或黑名单)
*/
public List<KafkaStream> createMessageStreamsByFilter(
TopicFilter topicFilter, int numStreams);

/* 提交截止至目前所有的已消费的消息的偏移量 */
public commitOffsets()

/* 关闭连接 */
public shutdown()
}

这个API围绕迭代器并由KafkaStream类实现。每个kafkastream表示从一个或多个服务器的一个或多个分区的信息流。每个流用于单线程处理,所以客户端可以在创建调用中提供所需的流数。因此,流可能代表多个服务器分区的合并(对应于处理线程的数量),但每个分区只会流向一个流。

createMessageStreams方法调用已在某个topic注册的consumer,这将导致消费者/kafka节点分配的再平衡。API鼓励在一个调用中创建多个主题流,以最小化这种重新平衡。createMessageStreamsByFilter方法调用(额外的)注册的watcher去发现新的符合被过滤的topic。注意通过createMessageStreamsByFilter方法返回的每个流可能会迭代多个topic的消息(譬如,过滤器中允许多个topic)

5.2 网络层

Kafka网络层是一个相当简单的NIO服务器,这个将不会进行详细的阐述。sendfile的实现是由MessageSet接口和writeTo方法完成。这使得备份文件的信息集合,使用更有效的transferTo实现而不是中间缓冲写。线程模型是一个单线程和用来处理每个固定连接数的N个处理器线程组成。这种设计已经在其他地方进行了充分的测试,并且被公认为是简单和快速的实现。该协议保持相当简洁的形式,以便将来更多其他类型语言的客户端实现。

5.3 消息

消息由固定大小的head、可变长度的不透明密钥键字节数组和可变长度的不透明值字节数组组成.消息头包含如下的一些字段:
– CRC32 用以检测消息的截取和损坏
– 格式版本
– 鉴别器的一个属性
– 时间戳

使键和值保持不透明是一个正确的确定:现在序列化包有很大的进展,任何特定的选择都不适合所有的使用.更不用说,一个特定的应用程序使用卡夫卡可能会指定一个特定的序列化类型作为其使用的一部分。MessageSet接口仅仅只是一个迭代器,用于迭代方法产生的消息,这个方法对NiO通道进行批量读取和写入。

转载自 并发编程网 - ifeve.com

时间: 2024-09-01 07:45:41

《KAFKA官方文档》设计与实现(一)的相关文章

《Kafka官方文档》设计(一)

Design 1. Motivation 我们设计Kafka用来作为统一的平台来处理大公司可能拥有的所有实时数据源.为了做到这点,我们必须思考大量的使用场景. 它必须有高吞吐去支持大数据流,例如实时日志聚合. 它必须优雅的处理数据积压,以支持定期从离线系统加载数据. 这也以为这系统必须支持低延迟的分发来处理传统消息系统的场景. 我们想支持分区的.分布式的.实时的处理数据源并创建新的数据源,这推动了我们的分区和消费模型. 最后,将流反馈到其他系统进行服务的情况下,我们知道系统必须能够保证容错性,在

《Kafka官方文档》设计(二)

6. Message Delivery Semantics 现在我们对Producer和Consumer已经有了一定的了解,接着我们来讨论Kafka在Producer和Consumer上提供的语义.显然的,在分发消息时是可以有多种语义的: At most once:消息可能丢失,但不会重复投递 At least once:消息不会丢失,但可能会重复投递 Exactly once:消息不丢失.不重复,会且只会被分发一次(真正想要的) 值得注意的是这分为两个问题:发布消息的可用性和消费消息的可用性.

《KAFKA官方文档》设计与实现(二)

5.4 消息格式 /** * 1. 消息的4字节CRC32 * 2. 一个字节的 identifier ,用以格式的变化,变化的值为0 或者1 * 3. 一个字节的 identifier属性,允许消息的注释与版本无关 * 位 0 ~ 2 : 压缩编解码 * 0 : 无压缩 * 1 : gzip * 2 : snappy * 3 : lz4 * bit 3 : 时间戳类型 * 0 : 创建时间 * 1 : 日志追加时间 * bit 4 ~ 7 : 保留位 * 4. (可选的) 8字节时间戳只有当"

《KAFKA官方文档》翻译邀请

之前在北京面试,很多应聘者都提到自己使用过KAFAKA,所以计划组织同学们翻译<KAFKA官方指南>,欢迎有兴趣的同学参与. 如何领取 通过评论领取想要翻译的文章,每次领取一章或一节(根据内容长短),翻译完后再领取其他章节.领取完成之后,译文最好在一个星期内翻译完成,不要超过两周,如果不能完成翻译,也欢迎你邀请其他同学和你一起完成翻译.请谨慎领取,很多文章领取了没有翻译,导致文章很长时间没人翻译. 如何提交? 翻译完成之后请登录到并发网提交成待审核状态,会有专门的编辑校对后进行发布.如果多篇文

《Kafka 官方文档》 介绍

介绍 Apache Kafka 是 一个分布式数据流平台. 这意味什么呢? 我们认为一个数据流平台有三种能力: 它让你发布和订阅数据流. 在这方面他与消息队列或企业级消息系统很像. 它让你具有很强容灾性的存储数据流. 它让你及时的处理数据流. 那么Kafka适合做什么呢? 它通常被使用在两大类应用中: 搭建可以使数据在系统或应用之间流动的实时数据流管道(pipelines) 搭建可以针对流数据实行实时转换或作出相应反应的数据流应用 为了了解Kafka具体如何实现这些功能, 我们来从底层开始,探索

《KAFKA官方文档》入门指南(一)

1.入门指南 1.1简介 Apache的Kafka是一个分布式流平台(a distributed streaming platform).这到底意味着什么? 我们认为,一个流处理平台应该具有三个关键能力: 它可以让你发布和订阅记录流.在这方面,它类似于一个消息队列或企业消息系统. 它可以让你持久化收到的记录流,从而具有容错能力. 它可以让你处理收到的记录流. Kafka擅长哪些方面? 它被用于两大类应用: 建立实时流数据管道从而能够可靠地在系统或应用程序之间的共享数据 构建实时流应用程序,能够变

《KAFKA官方文档》第三章:快速入门(二)

第八步:使用Kafka流(Kafka Streams)处理数据 Kafka流是一个针对存储于Kafka brokers上的数据进行实时流处理和分析的客户端类库.快速入门中的示例将展示如何使用这个类库实现一个数据流处理应用.下面是其中的WordCountDemo数单词示例代码片段(转换成Java8的lambda表达式更便于阅读). "` // 字符串和长整型的序列化器与反序列化器(serde) final Serde stringSerde = Serdes.String(); final Ser

《KAFKA官方文档》入门指南(三)

第7步:使用Kafka连接导入/导出数据 从控制台写入数据和写回控制台是一个很方便入门的例子,但你可能想用Kafka使用其他来源的数据或导出Kafka的数据到其他系统.相对于许多系统需要编写定制集成的代码,您可以使用Kafka连接到系统去导入或导出数据. Kafka Connect是包括在Kafka中一个工具,用来导入导出数据到Kafka.它是connectors的一个可扩展工具,其执行定制逻辑,用于与外部系统交互.在这个快速入门,我们将看到如何使用Kafka Connect做一些简单的连接器从

《KAFKA官方文档》5.2 APIs

2. APIs Kafka包含四种核心的API: Producer API支持应用将数据流发送到Kafka集群的主题. Consumer API支持应用从Kafka集群的主题中读取数据流. Streams API支持数据流从输入主题转化到输出主题. Connect API支持实现持续地从一些源系统或应用划入Kafka或者从Kafka推入一些源系统或应用的接口. Kafka通过独立于语言的协议公开其所有功能,该协议具有可用于诸多编程语言的客户端.但是,只有Java客户端作为主Kafka项目的一部分