Kafka源码分析之RecordAccumulator

        RecordAccumulator作为一个队列,累积记录records到MemoryRecords实例,然后被发送到服务器server。其成员变量如下:

    // RecordAccumulator是否关闭的标志位closed
    private volatile boolean closed;

    // 索引号drainIndex
    private int drainIndex;

    // flushes过程计数器flushesInProgress
    private final AtomicInteger flushesInProgress;

    // appends过程计数器appendsInProgress
    private final AtomicInteger appendsInProgress;

    // 批量大小batchSize
    private final int batchSize;

    // 压缩器类型CompressionType实例compression
    private final CompressionType compression;

    // 延迟时间lingerMs
    private final long lingerMs;

    // 重试时间retryBackoffMs
    private final long retryBackoffMs;

    // 缓冲池BufferPool类型的free
    private final BufferPool free;

    private final Time time;

    // TopicPartitiond到RecordBatch双端队列的ConcurrentMap集合batches
    private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;

    // 处于完成状态的批量记录IncompleteRecordBatches类型的incomplete
    private final IncompleteRecordBatches incomplete;

        再看下RecordAccumulator的构造方法,代码如下:

    /**
     * Create a new record accumulator
     *
     * @param batchSize The size to use when allocating {@link org.apache.kafka.common.record.MemoryRecords} instances
     * @param totalSize The maximum memory the record accumulator can use.
     * @param compression The compression codec for the records
     * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
     *        sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some
     *        latency for potentially better throughput due to more batching (and hence fewer, larger requests).
     * @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids
     *        exhausting all retries in a short period of time.
     * @param metrics The metrics
     * @param time The time instance to use
     * @param metricTags additional key/value attributes of the metric
     */
    public RecordAccumulator(int batchSize,
                             long totalSize,
                             CompressionType compression,
                             long lingerMs,
                             long retryBackoffMs,
                             Metrics metrics,
                             Time time,
                             Map<String, String> metricTags) {

    	// 序号drainIndex初始化为0
    	this.drainIndex = 0;

    	// 标志位closed初始化为false
        this.closed = false;

        // flushes过程计数器flushesInProgress初始化为0
        this.flushesInProgress = new AtomicInteger(0);

        // appends过程计数器appendsInProgress初始化为0
        this.appendsInProgress = new AtomicInteger(0);

        // 根据入参初始化batchSize、compression、lingerMs、retryBackoffMs等成员变量
        this.batchSize = batchSize;
        this.compression = compression;
        this.lingerMs = lingerMs;
        this.retryBackoffMs = retryBackoffMs;

        // batches初始化CopyOnWriteMap集合
        this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();

        String metricGrpName = "producer-metrics";

        // 构造BufferPool实例,初始化成员变量free
        this.free = new BufferPool(totalSize, batchSize, metrics, time , metricGrpName , metricTags);

        // 构造IncompleteRecordBatches实例,初始化成员变量incomplete
        this.incomplete = new IncompleteRecordBatches();

        // 根据入参time初始化成员变量time
        this.time = time;

        // 调用registerMetrics()注册度量信息
        registerMetrics(metrics, metricGrpName, metricTags);
    }

        

时间: 2024-09-21 05:48:59

Kafka源码分析之RecordAccumulator的相关文章

Apache Kafka源码分析 – Broker Server

1. Kafka.scala 在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装 1: val kafkaServerStartble = new KafkaServerStartable(serverConfig) 2: kafkaServerStartble.startup   1: package kafka.server 2: class KafkaServerStartab

Apache Kafka源码分析 - kafka controller

前面已经分析过kafka server的启动过程,以及server所能处理的所有的request,即KafkaApis  剩下的,其实关键就是controller,以及partition和replica的状态机  这里先看看controller在broker server的基础上,多做了哪些初始化和failover的工作   最关键的一句, private val controllerElector = new ZookeeperLeaderElector(controllerContext,

Kafka源码分析之Sender

        Sender为处理发送produce请求至Kafka集群的后台线程.这个线程更新集群元数据,然后发送produce请求至适当的节点.         首先,我们先看下它的成员变量: /* the state of each nodes connection */ // 每个节点连接的状态KafkaClient实例client private final KafkaClient client; /* the record accumulator that batches recor

Kafka源码分析之KafkaProducer

        KafkaProducer是一个Kafka客户端实现,可以发布记录records至Kafka集群.KafkaProducer是线程安全的,多线程之间共享单独一个producer实例通常会比多个producer实例要快.KafkaProducer包含一组缓存池空间,存储尚未传输到集群的记录records,同时,一个后台的I/O线程负责将这些记录转换成请求,并发送至集群.使用之后关闭producer失败将会导致这些资源泄露.         我们看下KafkaProducer都有哪些

Kafka源码分析之KafkaProducer发送数据send()方法

        KafkaProducer是Kafka中Producer的一种实现,其主要功能就是发送消息给Kafka中broker.其send()方法如下: /** * Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>. * See {@link #send(ProducerRecord, Callback)} for details. */ @Overri

Apache Kafka源码分析 – Log Management

LogManager LogManager会管理broker上所有的logs(在一个log目录下),一个topic的一个partition对应于一个log(一个log子目录) 首先loadLogs会加载每个partition所对应的log对象, 然后提供createLog,getLog,deleteLog之类的管理接口 并且会创建些后台线程来进行,cleanup,flush,checkpoint生成之类的工作 Log Log只是对于LogSegments的封装,包含loadSegments,ap

Apache Kafka源码分析 - KafkaApis

kafka apis反映出kafka broker server可以提供哪些服务, broker server主要和producer,consumer,controller有交互,搞清这些api就清楚了broker server的所有行为   handleOffsetRequest 提供对offset的查询的需求,比如查询earliest,latest offset是什么,或before某个时间戳的offset是什么 try { // ensure leader exists // 确定是否是l

Kafka源码分析之RecordBatch

        RecordBatch是Kafka中Producer中对批量记录的一个封装,它表示正在或将要被发送的一批记录.这个类不是线程安全的,当修改它时必须使用外部同步.RecordBatch中的成员变量如下: // 记录数目recordCount public int recordCount = 0; // 最大记录大小maxRecordSize public int maxRecordSize = 0; // 尝试次数attempts public volatile int attem

Apache Kafka源码分析 - autoLeaderRebalanceEnable

在broker的配置中,auto.leader.rebalance.enable (false) 那么这个leader是如何进行rebalance的? 首先在controller启动的时候会打开一个scheduler, if (config.autoLeaderRebalanceEnable) { //如果打开outoLeaderRebalance,需要把partiton leader由于dead而发生迁徙的,重新迁徙回去 info("starting the partition rebalan