5.设计与实现(IMPLEMENTATION)
5.1 API 设计
生产者 APIS
生产者API包含2个producers-kafka.producer.SyncProducer
和kafka.producer.async.AsyncProducer。
示例代码如下:
class Producer {
/* Sends the data, partitioned by key to the topic using either the */
/* synchronous or the asynchronous producer */
/*使用同步或者异步的生产者,发送单条消息至由key所对应的topic的分区*/
public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);
/* Sends a list of data, partitioned by key to the topic using either */
/* the synchronous or the asynchronous producer */
/*使用同步或者异步的生产者,发送一系列数据至由key所对应的topic的分区*/
public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);
/* Closes the producer and cleans up */
/*关闭生产者并做相应清理*/
public void close();
}
其目的就是通过一个单一的API向客户端暴露所有的生产者功能。kafka生产者
- 可以处理多个生产者的排队以及缓冲请求以及异步地分发批量的数据:
kafka.producer.Producer
对于多个生产者的请求数据(producer.type=async
),在序列化和分发它们至相应的kafka节点分区之前,其有能力对它们进行批量处理。而批量处理的大小可由少量的配置参数完成。当数据进入至队列,它们将被缓冲在队列里面,直到queue.time
超时或者达到了配置(batch.size
)的批量处理的最大值.后台的异步线程(kafka.producer.async.ProducerSendThread
)负责将队列里的数据批量取出并让kafka.producer.EventHandler
进行序列化工作,且将数据发送至kafka相应的节点分区。通过设置event.handler
配置参数,即可实现一个自定义的事件处理器(event handler)。不论对于植入自定义日志/跟踪代码,还是自定义监控逻辑,能在生产者队列管道的不同阶段注入回调函数是极其有帮助的。一种可能的方案是通过实现kafka.producer.async.CallbackHandler
接口并且对该类设置callback.handler
配置参数。
- 通过用户自定义的
Encoder
实现对数据的序列化操作:
interface Encoder<T> {
public Message toMessage(T data);
}
默认的Encoder``是
kafka.serializer.DefaultEncoder“`
- 通过用户设置(可选)的
Partitioner
提供基于软件层面的负载均衡(slb):kafka.producer.Partitioner
会影响到数据传输时的路由策略。
interface Partitioner<T> {
int partition(T key, int numPartitions);
}
分区API使用key以及可用节点分区来返回一个分区id。这个id通常用作有序broker_ids
的索引,同时节点分区(partitions)将会用这个id挑选出一个分区去处理生产者的请求。默认的分区策略是是对key进行hash,并对分区数目取余,即hash(key)%numPartitions
。如果key为null,那么将会挑选出一个随机的节点。如果想要实现自定义的分区策略,也可以通过设置partitioner.class
配置参数实现。
消费者 APIS
kafka提供两种级别的消费者APIS。对于普通、简单的消费者API,其仅包含对单个节点的连接,且可关闭发送给server网络请求。这个API是完全无状态的,每个网络请求将携带偏移量,用户可以根据自己的选择是否保留这些元数据。
高级的消费者API不仅隐藏了kafka集群的细节,而且可以消费集群中的任意一台机器而不用关心其背后的网络拓扑。同时,它也保留了消息是否被消费的状态。另外,高级别的消费者API还支持对依据过滤表达式来对订阅的topic进行过滤(譬如白名单或者黑名单等类似的正则表达式)
普通的 API
class SimpleConsumer {
/*向节点发出拉取消息的请求,并返回消息的数据集*/
public ByteBufferMessageSet fetch(FetchRequest request);
/*发送批量拉取消息的请求,返回响应数据集*/
public MultiFetchResponse multifetch(List<FetchRequest> fetches);
/**
*在给定时间前返回有效的偏移量(分区容量最大值)数据集,且为降序排列
*
* @param time: 毫秒,
* 如果设置了 OffsetRequest$.MODULE$.LATEST_TIME(),则可以从最新的偏移量获取消息
* 如果设置了 OffsetRequest$.MODULE$.EARLIEST_TIME(), 则可以从最早的偏移获取消息.
*/
public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}
普通消费者API通常用于实现高级API,以及用于一些离线消费者,这些消费者对于保持状态有特殊的要求
高级 API
/*创建一个对kafka集群的连接*/
ConsumerConnector connector = Consumer.create(consumerConfig);
interface ConsumerConnector {
/**
* 此方法用于获取KafkaStreams的集合,这个集合是
* MessageAndMetadata对象的迭代器,通过这个对象你可以获取到与元数据(目前仅指topic)相关联的消息
* Input: a map of <topic, #streams>
* Output: a map of <topic, list of message streams>
*/
public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
/**
* 你可以获取KafkaStreams的集合, 对符合TopicFilter过滤后topic消息进行迭代(TopicFilter是用标准Java正则表达式封装的topic白名单或黑名单)
*/
public List<KafkaStream> createMessageStreamsByFilter(
TopicFilter topicFilter, int numStreams);
/* 提交截止至目前所有的已消费的消息的偏移量 */
public commitOffsets()
/* 关闭连接 */
public shutdown()
}
这个API围绕迭代器并由KafkaStream类实现。每个kafkastream表示从一个或多个服务器的一个或多个分区的信息流。每个流用于单线程处理,所以客户端可以在创建调用中提供所需的流数。因此,流可能代表多个服务器分区的合并(对应于处理线程的数量),但每个分区只会流向一个流。
createMessageStreams方法调用已在某个topic注册的consumer,这将导致消费者/kafka节点分配的再平衡。API鼓励在一个调用中创建多个主题流,以最小化这种重新平衡。createMessageStreamsByFilter方法调用(额外的)注册的watcher去发现新的符合被过滤的topic。注意通过createMessageStreamsByFilter方法返回的每个流可能会迭代多个topic的消息(譬如,过滤器中允许多个topic)
5.2 网络层
Kafka网络层是一个相当简单的NIO服务器,这个将不会进行详细的阐述。sendfile的实现是由MessageSet
接口和writeTo
方法完成。这使得备份文件的信息集合,使用更有效的transferTo
实现而不是中间缓冲写。线程模型是一个单线程和用来处理每个固定连接数的N个处理器线程组成。这种设计已经在其他地方进行了充分的测试,并且被公认为是简单和快速的实现。该协议保持相当简洁的形式,以便将来更多其他类型语言的客户端实现。
5.3 消息
消息由固定大小的head、可变长度的不透明密钥键字节数组和可变长度的不透明值字节数组组成.消息头包含如下的一些字段:
– CRC32 用以检测消息的截取和损坏
– 格式版本
– 鉴别器的一个属性
– 时间戳
使键和值保持不透明是一个正确的确定:现在序列化包有很大的进展,任何特定的选择都不适合所有的使用.更不用说,一个特定的应用程序使用卡夫卡可能会指定一个特定的序列化类型作为其使用的一部分。MessageSet
接口仅仅只是一个迭代器,用于迭代方法产生的消息,这个方法对NiO通道进行批量读取和写入。