Kafka源码分析之Sender

        Sender为处理发送produce请求至Kafka集群的后台线程。这个线程更新集群元数据,然后发送produce请求至适当的节点。

        首先,我们先看下它的成员变量:

    /* the state of each nodes connection */
    // 每个节点连接的状态KafkaClient实例client
    private final KafkaClient client;

    /* the record accumulator that batches records */
    // 批量记录的记录累加器RecordAccumulator实例accumulator
    private final RecordAccumulator accumulator;

    /* the metadata for the client */
    // 客户端元数据Metadata实例metadata
    private final Metadata metadata;

    /* the maximum request size to attempt to send to the server */
    // 试图发送到server端的最大请求大小maxRequestSize
    private final int maxRequestSize;

    /* the number of acknowledgements to request from the server */
    // 从server端获得的请求发送的已确认数量acks
    private final short acks;

    /* the number of times to retry a failed request before giving up */
    // 一个失败请求在被放弃之前的重试次数retries
    private final int retries;

    /* the clock instance used for getting the time */
    // 获取时间的时钟Time实例time
    private final Time time;

    /* true while the sender thread is still running */
    // Sender线程运行的标志位,为true表示Sender线程一直在运行
    private volatile boolean running;

    /* true when the caller wants to ignore all unsent/inflight messages and force close.  */
    // 强制关闭的标志位forceClose
    private volatile boolean forceClose;

    /* metrics */
    // 度量指标
    private final SenderMetrics sensors;

    /* param clientId of the client */
    // 客户端的clientId
    private String clientId;

    /* the max time to wait for the server to respond to the request*/
    // 等到server端响应请求的超时时间requestTimeout
    private final int requestTimeout;

        既然是一个线程,我们看下它的主要运行逻辑run()方法,代码如下:

    /**
     * The main run loop for the sender thread
     * sender线程的主循环
     */
    public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        // 主循环,一直运行直到close被调用
        while (running) {// 标志位running为true,则一直循环
            try {
            	// 调用待参数的run()方法
                run(time.milliseconds());
            } catch (Exception e) {

            	// 截获异常后记录err级别log信息,输出异常
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        // okay we stopped accepting requests but there may still be
        // requests in the accumulator or waiting for acknowledgment,
        // wait until these are completed.
        // 如果不是强制关闭,且消息累加器accumulator尚有消息未发送,或者客户端client尚有正在处理(in-flight)的请求
        while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
            try {
            	// 调用调用待参数的run()方法继续处理
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        // 如果是强制关闭,调用消息累加器accumulator的abortIncompleteBatches(),放弃未处理完的请求
        if (forceClose) {
            // We need to fail all the incomplete batches and wake up the threads waiting on
            // the futures.
            this.accumulator.abortIncompleteBatches();
        }

        // 关闭客户端
        try {
            this.client.close();
        } catch (Exception e) {
            log.error("Failed to close network client", e);
        }

        log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }

        Sender线程的主循环在run()方法内,其主要处理逻辑为:

        1、首先进入一个while主循环,当标志位running为true时一直循环,直到close被调用:

              调用带参数的run(long now)方法,处理消息的发送;

        2、当close被调用时,running被设置为false,while主循环退出:

              2.1、如果不是强制关闭,且消息累加器accumulator尚有消息未发送,或者客户端client尚有正在处理(in-flight)的请求,进入另外一个while循环,调用带参数的run(long now)方法,处理尚未发送完的消息的发送;

              2.2、如果是强制关闭,调用消息累加器accumulator的abortIncompleteBatches(),放弃未处理完的请求;

              2.3、关闭客户端。

时间: 2024-08-02 10:15:27

Kafka源码分析之Sender的相关文章

Apache Kafka源码分析 – Broker Server

1. Kafka.scala 在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装 1: val kafkaServerStartble = new KafkaServerStartable(serverConfig) 2: kafkaServerStartble.startup   1: package kafka.server 2: class KafkaServerStartab

Apache Kafka源码分析 - kafka controller

前面已经分析过kafka server的启动过程,以及server所能处理的所有的request,即KafkaApis  剩下的,其实关键就是controller,以及partition和replica的状态机  这里先看看controller在broker server的基础上,多做了哪些初始化和failover的工作   最关键的一句, private val controllerElector = new ZookeeperLeaderElector(controllerContext,

Kafka源码分析之KafkaProducer

        KafkaProducer是一个Kafka客户端实现,可以发布记录records至Kafka集群.KafkaProducer是线程安全的,多线程之间共享单独一个producer实例通常会比多个producer实例要快.KafkaProducer包含一组缓存池空间,存储尚未传输到集群的记录records,同时,一个后台的I/O线程负责将这些记录转换成请求,并发送至集群.使用之后关闭producer失败将会导致这些资源泄露.         我们看下KafkaProducer都有哪些

Kafka源码分析之KafkaProducer发送数据send()方法

        KafkaProducer是Kafka中Producer的一种实现,其主要功能就是发送消息给Kafka中broker.其send()方法如下: /** * Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>. * See {@link #send(ProducerRecord, Callback)} for details. */ @Overri

Apache Kafka源码分析 – Log Management

LogManager LogManager会管理broker上所有的logs(在一个log目录下),一个topic的一个partition对应于一个log(一个log子目录) 首先loadLogs会加载每个partition所对应的log对象, 然后提供createLog,getLog,deleteLog之类的管理接口 并且会创建些后台线程来进行,cleanup,flush,checkpoint生成之类的工作 Log Log只是对于LogSegments的封装,包含loadSegments,ap

Apache Kafka源码分析 - KafkaApis

kafka apis反映出kafka broker server可以提供哪些服务, broker server主要和producer,consumer,controller有交互,搞清这些api就清楚了broker server的所有行为   handleOffsetRequest 提供对offset的查询的需求,比如查询earliest,latest offset是什么,或before某个时间戳的offset是什么 try { // ensure leader exists // 确定是否是l

Kafka源码分析之RecordBatch

        RecordBatch是Kafka中Producer中对批量记录的一个封装,它表示正在或将要被发送的一批记录.这个类不是线程安全的,当修改它时必须使用外部同步.RecordBatch中的成员变量如下: // 记录数目recordCount public int recordCount = 0; // 最大记录大小maxRecordSize public int maxRecordSize = 0; // 尝试次数attempts public volatile int attem

Apache Kafka源码分析 - autoLeaderRebalanceEnable

在broker的配置中,auto.leader.rebalance.enable (false) 那么这个leader是如何进行rebalance的? 首先在controller启动的时候会打开一个scheduler, if (config.autoLeaderRebalanceEnable) { //如果打开outoLeaderRebalance,需要把partiton leader由于dead而发生迁徙的,重新迁徙回去 info("starting the partition rebalan

Kafka源码分析之RecordAccumulator

        RecordAccumulator作为一个队列,累积记录records到MemoryRecords实例,然后被发送到服务器server.其成员变量如下: // RecordAccumulator是否关闭的标志位closed private volatile boolean closed; // 索引号drainIndex private int drainIndex; // flushes过程计数器flushesInProgress private final AtomicInt