Kafka源码分析之InFlightRequests

        InFlightRequests是对已经被发送或正在被发送但是均未接收到响应的客户端请求集合的一个封装,在它内部,有两个重要的变量,如下:

    // 每个连接最大执行中请求数
    private final int maxInFlightRequestsPerConnection;

    // 节点node至客户端请求双端队列Deque<ClientRequest>的映射集合
    private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();

        其中,requests是节点node至客户端请求双端队列Deque<ClientRequest>的映射集合,其中key为字符串形式的节点node,value为一个双端队列Deque,其中的元素为客户端请求ClientRequest。当有新请求需要处理时,会在队首入列,而实际被处理的请求,则是从队尾出列,保证入列早的请求先得到处理。maxInFlightRequestsPerConnection不消多说,它是限制每个连接,即每个node对应客户端请求队列大小的一个阈值。

        既然InFlightRequests本质上是对客户端请求按照node区分的一个双端队列映射集合,那么我们来看下它的队列入队及出队操作。

        首先,入队是通过add()方法来完成的,代码如下:

    /**
     * Add the given request to the queue for the connection it was directed to
     * 将给定请求添加至其对于连接的队列
     */
    public void add(ClientRequest request) {

    	// 从requests集合中根据给定请求的目的地node获取Deque<ClientRequest>双端队列reqs
        Deque<ClientRequest> reqs = this.requests.get(request.request().destination());

        // 如果双端队列reqs为nul
        if (reqs == null) {
        	// 构造一个双端队列ArrayDeque类型的reqs
            reqs = new ArrayDeque<>();

            // 将请求目的地node至reqs的对应关系添加到requests集合
            this.requests.put(request.request().destination(), reqs);
        }

        // reqs队列首部添加请求request,使用的是addFirst()方法
        reqs.addFirst(request);
    }

        逻辑很简单,大体如下:

        1、从requests集合中根据给定请求的目的地node获取Deque<ClientRequest>双端队列reqs;

        2、如果双端队列reqs为nul:

              2.1、构造一个双端队列ArrayDeque类型的reqs;

              2.2、将请求目的地node至reqs的对应关系添加到requests集合;

        3、reqs队列首部添加请求request,使用的是addFirst()方法。

        这里,最关键的一点是它将请求添加至双端队列的队首,使用的是addFirst()方法。

        出队是通过completeNext()来实现的,代码如下:

    /**
     * Get the oldest request (the one that that will be completed next) for the given node
     * 获取给定节点node的时间最久执行中请求,作为接下来要完成的请求
     */
    public ClientRequest completeNext(String node) {

    	// 根据给定节点node获取客户端请求双端队列reqs,并从poll出队尾元素
    	// add时是通过addFirst()方法添加到队首的,所以队尾的元素是时间最久的,也是应该先处理的
    	return requestQueue(node).pollLast();
    }

        completeNext()方法根据给定节点node获取客户端请求双端队列reqs,并从poll出队尾元素。所以,这里我们简单总结下,add时是通过addFirst()方法添加到队首的,所以队尾的元素是时间最久的,也是应该先处理的,故出队应该用pollLast(),将存储时间最久的元素移出进行处理。
 

时间: 2024-09-17 11:18:20

Kafka源码分析之InFlightRequests的相关文章

Apache Kafka源码分析 – Broker Server

1. Kafka.scala 在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装 1: val kafkaServerStartble = new KafkaServerStartable(serverConfig) 2: kafkaServerStartble.startup   1: package kafka.server 2: class KafkaServerStartab

Apache Kafka源码分析 - kafka controller

前面已经分析过kafka server的启动过程,以及server所能处理的所有的request,即KafkaApis  剩下的,其实关键就是controller,以及partition和replica的状态机  这里先看看controller在broker server的基础上,多做了哪些初始化和failover的工作   最关键的一句, private val controllerElector = new ZookeeperLeaderElector(controllerContext,

Apache Kafka源码分析 – Log Management

LogManager LogManager会管理broker上所有的logs(在一个log目录下),一个topic的一个partition对应于一个log(一个log子目录) 首先loadLogs会加载每个partition所对应的log对象, 然后提供createLog,getLog,deleteLog之类的管理接口 并且会创建些后台线程来进行,cleanup,flush,checkpoint生成之类的工作 Log Log只是对于LogSegments的封装,包含loadSegments,ap

Apache Kafka源码分析 - KafkaApis

kafka apis反映出kafka broker server可以提供哪些服务, broker server主要和producer,consumer,controller有交互,搞清这些api就清楚了broker server的所有行为   handleOffsetRequest 提供对offset的查询的需求,比如查询earliest,latest offset是什么,或before某个时间戳的offset是什么 try { // ensure leader exists // 确定是否是l

Kafka源码分析之Sender

        Sender为处理发送produce请求至Kafka集群的后台线程.这个线程更新集群元数据,然后发送produce请求至适当的节点.         首先,我们先看下它的成员变量: /* the state of each nodes connection */ // 每个节点连接的状态KafkaClient实例client private final KafkaClient client; /* the record accumulator that batches recor

Kafka源码分析之KafkaProducer

        KafkaProducer是一个Kafka客户端实现,可以发布记录records至Kafka集群.KafkaProducer是线程安全的,多线程之间共享单独一个producer实例通常会比多个producer实例要快.KafkaProducer包含一组缓存池空间,存储尚未传输到集群的记录records,同时,一个后台的I/O线程负责将这些记录转换成请求,并发送至集群.使用之后关闭producer失败将会导致这些资源泄露.         我们看下KafkaProducer都有哪些

Kafka源码分析之RecordBatch

        RecordBatch是Kafka中Producer中对批量记录的一个封装,它表示正在或将要被发送的一批记录.这个类不是线程安全的,当修改它时必须使用外部同步.RecordBatch中的成员变量如下: // 记录数目recordCount public int recordCount = 0; // 最大记录大小maxRecordSize public int maxRecordSize = 0; // 尝试次数attempts public volatile int attem

Apache Kafka源码分析 - autoLeaderRebalanceEnable

在broker的配置中,auto.leader.rebalance.enable (false) 那么这个leader是如何进行rebalance的? 首先在controller启动的时候会打开一个scheduler, if (config.autoLeaderRebalanceEnable) { //如果打开outoLeaderRebalance,需要把partiton leader由于dead而发生迁徙的,重新迁徙回去 info("starting the partition rebalan

Kafka源码分析之RecordAccumulator

        RecordAccumulator作为一个队列,累积记录records到MemoryRecords实例,然后被发送到服务器server.其成员变量如下: // RecordAccumulator是否关闭的标志位closed private volatile boolean closed; // 索引号drainIndex private int drainIndex; // flushes过程计数器flushesInProgress private final AtomicInt