Kafka源码分析之RecordBatch

        RecordBatch是Kafka中Producer中对批量记录的一个封装,它表示正在或将要被发送的一批记录。这个类不是线程安全的,当修改它时必须使用外部同步。RecordBatch中的成员变量如下:

    // 记录数目recordCount
    public int recordCount = 0;

    // 最大记录大小maxRecordSize
    public int maxRecordSize = 0;

    // 尝试次数attempts
    public volatile int attempts = 0;

    // RecordBatch创建时间createdMs
    public final long createdMs;

    public long drainedMs;

    // 上次尝试时间lastAttemptMs
    public long lastAttemptMs;

    // 内存记录MemoryRecords,在内存中存储Record
    public final MemoryRecords records;

    // 主题和分区的复合体topicPartition
    public final TopicPartition topicPartition;

    // Produce请求结果ProduceRequestResult实例produceFuture
    public final ProduceRequestResult produceFuture;

    // 上次添加记录时间lastAppendTime
    public long lastAppendTime;

    // 回调函数结构体Thunk列表thunks
    private final List<Thunk> thunks;

    // 重试标志位retry
    private boolean retry;

        RecordBatch中最主要的一个成员变量是MemoryRecords类型的records,它是Producer发送的记录record在内存中的一个数据集,另外还有一些记录数目recordCount、最大记录大小maxRecordSize、RecordBatch创建时间createdMs、上次尝试时间lastAttemptMs、主题和分区的复合体topicPartition、Produce请求结果ProduceRequestResult实例produceFuture、回调函数结构体列表thunks等重要变量。

        Thunk是RecordBatch的一个静态内部类,它是对Produce记录回调函数Callback和其相关参数FutureRecordMetadata的一个封装,其定义如下:

    /**
     * A callback and the associated FutureRecordMetadata argument to pass to it.
     */
    final private static class Thunk {
        final Callback callback;
        final FutureRecordMetadata future;

        public Thunk(Callback callback, FutureRecordMetadata future) {
            this.callback = callback;
            this.future = future;
        }
    }

         再看它的构造函数,如下:

    // 构造函数
    public RecordBatch(TopicPartition tp, MemoryRecords records, long now) {

    	// RecordBatch创建时间createdMs赋值为now
    	this.createdMs = now;

    	// 上次尝试时间lastAttemptMs赋值为now
        this.lastAttemptMs = now;

        // 记录records赋值为records
        this.records = records;

        // 主题分区topicPartition赋值为tp
        this.topicPartition = tp;

        // 构造Produce请求结果ProduceRequestResult实例produceFuture
        this.produceFuture = new ProduceRequestResult();

        // 构造回调函数结构体列表thunks
        this.thunks = new ArrayList<Thunk>();

        // 上次添加记录时间lastAppendTime赋值为创建时间createdMs,也就是now
        this.lastAppendTime = createdMs;

        // 重试标志位retry默认为false
        this.retry = false;
    }

        既然为批量记录,那么RecordBatch的最主要一个功能就是添加记录,而记录又不可能无限制添加,所以tryAppend()方法就是完成这个功能的。它尝试添加记录,如果添加成功,则返回FutureRecordMetadata实例,其中包含了记录在记录集中的相对偏移量,否则返回null,代码如下:

    /**
     * Append the record to the current record set and return the relative offset within that record set
     * 添加记录到当前记录集合,返回记录在记录集中的相对偏移量
     *
     * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
     */
    public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback, long now) {

    	if (!this.records.hasRoomFor(key, value)) {// 如果内存记录MemoryRecords实例records没有余地
    		// 直接返回null
            return null;
        } else {

        	// 将key、value添加进内存记录MemoryRecords实例records中
            this.records.append(0L, key, value);

            // 如果需要,更新最大记录大小maxRecordSize
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));

            // 上次添加记录时间lastAppendTime赋值为now
            this.lastAppendTime = now;

            // 构造FutureRecordMetadata实例future
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);

            // 将回调函数callback构造成Thunk对象添加到thunks列表
            if (callback != null)
                thunks.add(new Thunk(callback, future));

            // 记录数目recordCount加1
            this.recordCount++;

            // 返回FutureRecordMetadata实例future
            return future;
        }
    }

        tryAppend()方法需要四个参数,键key、值value、回调函数callback、添加时间now,其处理逻辑如下:

        1、首先判断内存记录MemoryRecords实例records中有没有余地存储该key、value,如果没有,直接返回null,否则继续;

        2、调用MemoryRecords的append()方法,将key、value添加进内存记录MemoryRecords实例records中;

        3、如果key、value对应大小超过当前maxRecordSize,更新最大记录大小maxRecordSize;

        4、更新上次添加记录时间lastAppendTime为now;

        5、根据produceFuture、recordCount构造FutureRecordMetadata实例future;

        6、利用future将回调函数callback构造成Thunk对象添加到thunks列表;

        7、记录数目recordCount加1;

        8、返回FutureRecordMetadata实例future。

        

时间: 2024-09-21 05:41:43

Kafka源码分析之RecordBatch的相关文章

Apache Kafka源码分析 – Broker Server

1. Kafka.scala 在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装 1: val kafkaServerStartble = new KafkaServerStartable(serverConfig) 2: kafkaServerStartble.startup   1: package kafka.server 2: class KafkaServerStartab

Apache Kafka源码分析 - kafka controller

前面已经分析过kafka server的启动过程,以及server所能处理的所有的request,即KafkaApis  剩下的,其实关键就是controller,以及partition和replica的状态机  这里先看看controller在broker server的基础上,多做了哪些初始化和failover的工作   最关键的一句, private val controllerElector = new ZookeeperLeaderElector(controllerContext,

Kafka源码分析之RecordAccumulator

        RecordAccumulator作为一个队列,累积记录records到MemoryRecords实例,然后被发送到服务器server.其成员变量如下: // RecordAccumulator是否关闭的标志位closed private volatile boolean closed; // 索引号drainIndex private int drainIndex; // flushes过程计数器flushesInProgress private final AtomicInt

Apache Kafka源码分析 – Log Management

LogManager LogManager会管理broker上所有的logs(在一个log目录下),一个topic的一个partition对应于一个log(一个log子目录) 首先loadLogs会加载每个partition所对应的log对象, 然后提供createLog,getLog,deleteLog之类的管理接口 并且会创建些后台线程来进行,cleanup,flush,checkpoint生成之类的工作 Log Log只是对于LogSegments的封装,包含loadSegments,ap

Apache Kafka源码分析 - KafkaApis

kafka apis反映出kafka broker server可以提供哪些服务, broker server主要和producer,consumer,controller有交互,搞清这些api就清楚了broker server的所有行为   handleOffsetRequest 提供对offset的查询的需求,比如查询earliest,latest offset是什么,或before某个时间戳的offset是什么 try { // ensure leader exists // 确定是否是l

Kafka源码分析之Sender

        Sender为处理发送produce请求至Kafka集群的后台线程.这个线程更新集群元数据,然后发送produce请求至适当的节点.         首先,我们先看下它的成员变量: /* the state of each nodes connection */ // 每个节点连接的状态KafkaClient实例client private final KafkaClient client; /* the record accumulator that batches recor

Kafka源码分析之KafkaProducer

        KafkaProducer是一个Kafka客户端实现,可以发布记录records至Kafka集群.KafkaProducer是线程安全的,多线程之间共享单独一个producer实例通常会比多个producer实例要快.KafkaProducer包含一组缓存池空间,存储尚未传输到集群的记录records,同时,一个后台的I/O线程负责将这些记录转换成请求,并发送至集群.使用之后关闭producer失败将会导致这些资源泄露.         我们看下KafkaProducer都有哪些

Apache Kafka源码分析 - autoLeaderRebalanceEnable

在broker的配置中,auto.leader.rebalance.enable (false) 那么这个leader是如何进行rebalance的? 首先在controller启动的时候会打开一个scheduler, if (config.autoLeaderRebalanceEnable) { //如果打开outoLeaderRebalance,需要把partiton leader由于dead而发生迁徙的,重新迁徙回去 info("starting the partition rebalan

Kafka源码分析之KafkaProducer发送数据send()方法

        KafkaProducer是Kafka中Producer的一种实现,其主要功能就是发送消息给Kafka中broker.其send()方法如下: /** * Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>. * See {@link #send(ProducerRecord, Callback)} for details. */ @Overri