RecordBatch是Kafka中Producer中对批量记录的一个封装,它表示正在或将要被发送的一批记录。这个类不是线程安全的,当修改它时必须使用外部同步。RecordBatch中的成员变量如下:
// 记录数目recordCount public int recordCount = 0; // 最大记录大小maxRecordSize public int maxRecordSize = 0; // 尝试次数attempts public volatile int attempts = 0; // RecordBatch创建时间createdMs public final long createdMs; public long drainedMs; // 上次尝试时间lastAttemptMs public long lastAttemptMs; // 内存记录MemoryRecords,在内存中存储Record public final MemoryRecords records; // 主题和分区的复合体topicPartition public final TopicPartition topicPartition; // Produce请求结果ProduceRequestResult实例produceFuture public final ProduceRequestResult produceFuture; // 上次添加记录时间lastAppendTime public long lastAppendTime; // 回调函数结构体Thunk列表thunks private final List<Thunk> thunks; // 重试标志位retry private boolean retry;
RecordBatch中最主要的一个成员变量是MemoryRecords类型的records,它是Producer发送的记录record在内存中的一个数据集,另外还有一些记录数目recordCount、最大记录大小maxRecordSize、RecordBatch创建时间createdMs、上次尝试时间lastAttemptMs、主题和分区的复合体topicPartition、Produce请求结果ProduceRequestResult实例produceFuture、回调函数结构体列表thunks等重要变量。
Thunk是RecordBatch的一个静态内部类,它是对Produce记录回调函数Callback和其相关参数FutureRecordMetadata的一个封装,其定义如下:
/** * A callback and the associated FutureRecordMetadata argument to pass to it. */ final private static class Thunk { final Callback callback; final FutureRecordMetadata future; public Thunk(Callback callback, FutureRecordMetadata future) { this.callback = callback; this.future = future; } }
再看它的构造函数,如下:
// 构造函数 public RecordBatch(TopicPartition tp, MemoryRecords records, long now) { // RecordBatch创建时间createdMs赋值为now this.createdMs = now; // 上次尝试时间lastAttemptMs赋值为now this.lastAttemptMs = now; // 记录records赋值为records this.records = records; // 主题分区topicPartition赋值为tp this.topicPartition = tp; // 构造Produce请求结果ProduceRequestResult实例produceFuture this.produceFuture = new ProduceRequestResult(); // 构造回调函数结构体列表thunks this.thunks = new ArrayList<Thunk>(); // 上次添加记录时间lastAppendTime赋值为创建时间createdMs,也就是now this.lastAppendTime = createdMs; // 重试标志位retry默认为false this.retry = false; }
既然为批量记录,那么RecordBatch的最主要一个功能就是添加记录,而记录又不可能无限制添加,所以tryAppend()方法就是完成这个功能的。它尝试添加记录,如果添加成功,则返回FutureRecordMetadata实例,其中包含了记录在记录集中的相对偏移量,否则返回null,代码如下:
/** * Append the record to the current record set and return the relative offset within that record set * 添加记录到当前记录集合,返回记录在记录集中的相对偏移量 * * @return The RecordSend corresponding to this record or null if there isn't sufficient room. */ public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback, long now) { if (!this.records.hasRoomFor(key, value)) {// 如果内存记录MemoryRecords实例records没有余地 // 直接返回null return null; } else { // 将key、value添加进内存记录MemoryRecords实例records中 this.records.append(0L, key, value); // 如果需要,更新最大记录大小maxRecordSize this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value)); // 上次添加记录时间lastAppendTime赋值为now this.lastAppendTime = now; // 构造FutureRecordMetadata实例future FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount); // 将回调函数callback构造成Thunk对象添加到thunks列表 if (callback != null) thunks.add(new Thunk(callback, future)); // 记录数目recordCount加1 this.recordCount++; // 返回FutureRecordMetadata实例future return future; } }
tryAppend()方法需要四个参数,键key、值value、回调函数callback、添加时间now,其处理逻辑如下:
1、首先判断内存记录MemoryRecords实例records中有没有余地存储该key、value,如果没有,直接返回null,否则继续;
2、调用MemoryRecords的append()方法,将key、value添加进内存记录MemoryRecords实例records中;
3、如果key、value对应大小超过当前maxRecordSize,更新最大记录大小maxRecordSize;
4、更新上次添加记录时间lastAppendTime为now;
5、根据produceFuture、recordCount构造FutureRecordMetadata实例future;
6、利用future将回调函数callback构造成Thunk对象添加到thunks列表;
7、记录数目recordCount加1;
8、返回FutureRecordMetadata实例future。