


    /* the state of each nodes connection */
    // 每个节点连接的状态KafkaClient实例client
    private final KafkaClient client;

    /* the record accumulator that batches records */
    // 批量记录的记录累加器RecordAccumulator实例accumulator
    private final RecordAccumulator accumulator;

    /* the metadata for the client */
    // 客户端元数据Metadata实例metadata
    private final Metadata metadata;

    /* the maximum request size to attempt to send to the server */
    // 试图发送到server端的最大请求大小maxRequestSize
    private final int maxRequestSize;

    /* the number of acknowledgements to request from the server */
    // 从server端获得的请求发送的已确认数量acks
    private final short acks;

    /* the number of times to retry a failed request before giving up */
    // 一个失败请求在被放弃之前的重试次数retries
    private final int retries;

    /* the clock instance used for getting the time */
    // 获取时间的时钟Time实例time
    private final Time time;

    /* true while the sender thread is still running */
    // Sender线程运行的标志位,为true表示Sender线程一直在运行
    private volatile boolean running;

    /* true when the caller wants to ignore all unsent/inflight messages and force close.  */
    // 强制关闭的标志位forceClose
    private volatile boolean forceClose;

    /* metrics */
    // 度量指标
    private final SenderMetrics sensors;

    /* param clientId of the client */
    // 客户端的clientId
    private String clientId;

    /* the max time to wait for the server to respond to the request*/
    // 等到server端响应请求的超时时间requestTimeout
    private final int requestTimeout;


     * The main run loop for the sender thread
     * sender线程的主循环
    public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        // 主循环,一直运行直到close被调用
        while (running) {// 标志位running为true,则一直循环
            try {
            	// 调用待参数的run()方法
            } catch (Exception e) {

            	// 截获异常后记录err级别log信息,输出异常
                log.error("Uncaught error in kafka producer I/O thread: ", e);

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        // okay we stopped accepting requests but there may still be
        // requests in the accumulator or waiting for acknowledgment,
        // wait until these are completed.
        // 如果不是强制关闭,且消息累加器accumulator尚有消息未发送,或者客户端client尚有正在处理(in-flight)的请求
        while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
            try {
            	// 调用调用待参数的run()方法继续处理
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);

        // 如果是强制关闭,调用消息累加器accumulator的abortIncompleteBatches(),放弃未处理完的请求
        if (forceClose) {
            // We need to fail all the incomplete batches and wake up the threads waiting on
            // the futures.

        // 关闭客户端
        try {
        } catch (Exception e) {
            log.error("Failed to close network client", e);

        log.debug("Shutdown of Kafka producer I/O thread has completed.");



              调用带参数的run(long now)方法,处理消息的发送;


              2.1、如果不是强制关闭,且消息累加器accumulator尚有消息未发送,或者客户端client尚有正在处理(in-flight)的请求,进入另外一个while循环,调用带参数的run(long now)方法,处理尚未发送完的消息的发送;



