RecordAccumulator作为一个队列,累积记录records到MemoryRecords实例,然后被发送到服务器server。其成员变量如下:
// RecordAccumulator是否关闭的标志位closed private volatile boolean closed; // 索引号drainIndex private int drainIndex; // flushes过程计数器flushesInProgress private final AtomicInteger flushesInProgress; // appends过程计数器appendsInProgress private final AtomicInteger appendsInProgress; // 批量大小batchSize private final int batchSize; // 压缩器类型CompressionType实例compression private final CompressionType compression; // 延迟时间lingerMs private final long lingerMs; // 重试时间retryBackoffMs private final long retryBackoffMs; // 缓冲池BufferPool类型的free private final BufferPool free; private final Time time; // TopicPartitiond到RecordBatch双端队列的ConcurrentMap集合batches private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches; // 处于完成状态的批量记录IncompleteRecordBatches类型的incomplete private final IncompleteRecordBatches incomplete;
再看下RecordAccumulator的构造方法,代码如下:
/** * Create a new record accumulator * * @param batchSize The size to use when allocating {@link org.apache.kafka.common.record.MemoryRecords} instances * @param totalSize The maximum memory the record accumulator can use. * @param compression The compression codec for the records * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for * sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some * latency for potentially better throughput due to more batching (and hence fewer, larger requests). * @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids * exhausting all retries in a short period of time. * @param metrics The metrics * @param time The time instance to use * @param metricTags additional key/value attributes of the metric */ public RecordAccumulator(int batchSize, long totalSize, CompressionType compression, long lingerMs, long retryBackoffMs, Metrics metrics, Time time, Map<String, String> metricTags) { // 序号drainIndex初始化为0 this.drainIndex = 0; // 标志位closed初始化为false this.closed = false; // flushes过程计数器flushesInProgress初始化为0 this.flushesInProgress = new AtomicInteger(0); // appends过程计数器appendsInProgress初始化为0 this.appendsInProgress = new AtomicInteger(0); // 根据入参初始化batchSize、compression、lingerMs、retryBackoffMs等成员变量 this.batchSize = batchSize; this.compression = compression; this.lingerMs = lingerMs; this.retryBackoffMs = retryBackoffMs; // batches初始化CopyOnWriteMap集合 this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>(); String metricGrpName = "producer-metrics"; // 构造BufferPool实例,初始化成员变量free this.free = new BufferPool(totalSize, batchSize, metrics, time , metricGrpName , metricTags); // 构造IncompleteRecordBatches实例,初始化成员变量incomplete this.incomplete = new IncompleteRecordBatches(); // 根据入参time初始化成员变量time this.time = time; // 调用registerMetrics()注册度量信息 registerMetrics(metrics, metricGrpName, metricTags); }
时间: 2024-09-21 05:48:59