Kafka源码分析之KafkaProducer

        KafkaProducer是一个Kafka客户端实现,可以发布记录records至Kafka集群。KafkaProducer是线程安全的,多线程之间共享单独一个producer实例通常会比多个producer实例要快。KafkaProducer包含一组缓存池空间,存储尚未传输到集群的记录records,同时,一个后台的I/O线程负责将这些记录转换成请求,并发送至集群。使用之后关闭producer失败将会导致这些资源泄露。

        我们看下KafkaProducer都有哪些成员变量,如下:

    // 客户端ID:clientId
    private String clientId;

    // 分区器Partitioner实例partitioner
    private final Partitioner partitioner;

    // 最大请求大小maxRequestSize
    private final int maxRequestSize;

    // 内存总计大小totalMemorySize
    private final long totalMemorySize;

    // 集群元数据Metadata实例metadata
    private final Metadata metadata;

    // 记录收集器RecordAccumulator实例accumulator
    private final RecordAccumulator accumulator;

    // 后台发送线程Sender实例sender
    private final Sender sender;

    // 指标度量
    private final Metrics metrics;

    // io线程ioThread
    private final Thread ioThread;

    // 压缩类型CompressionType实例compressionType
    private final CompressionType compressionType;

    private final Sensor errors;

    // 时间器
    private final Time time;

    // key序列化器keySerializer
    private final Serializer<K> keySerializer;

    // value序列化器valueSerializer
    private final Serializer<V> valueSerializer;

    // Producer配置信息ProducerConfig实例producerConfig
    private final ProducerConfig producerConfig;

    // 最大阻塞时间maxBlockTimeMs
    private final long maxBlockTimeMs;

    // 请求超时时间requestTimeoutMs
    private final int requestTimeoutMs;

        其中,比较重要的几个是:

        1、分区器Partitioner实例partitioner:由它负责计算分区,确定主题内的实际存储位置;

        2、集群元数据Metadata实例metadata:存储了整个集群的元数据信息,包括节点列表、主题列表、主题与分区列表映射等信息;

        3、记录收集器RecordAccumulator实例accumulator:通过KafkaProducer发送的消息不会立即被发送到集群,而是先缓存在客户端内存缓冲池中,等待后台I/O线程处理;

        4、后台发送线程Sender实例sender:一个后台工作的I/O线程,由它负责将客户端内存缓冲池中的数据发送到集群;

        KafkaProducer的构造方法

时间: 2024-10-24 14:59:10

Kafka源码分析之KafkaProducer的相关文章

Kafka源码分析之KafkaProducer发送数据send()方法

        KafkaProducer是Kafka中Producer的一种实现,其主要功能就是发送消息给Kafka中broker.其send()方法如下: /** * Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>. * See {@link #send(ProducerRecord, Callback)} for details. */ @Overri

Apache Kafka源码分析 – Broker Server

1. Kafka.scala 在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装 1: val kafkaServerStartble = new KafkaServerStartable(serverConfig) 2: kafkaServerStartble.startup   1: package kafka.server 2: class KafkaServerStartab

Apache Kafka源码分析 - kafka controller

前面已经分析过kafka server的启动过程,以及server所能处理的所有的request,即KafkaApis  剩下的,其实关键就是controller,以及partition和replica的状态机  这里先看看controller在broker server的基础上,多做了哪些初始化和failover的工作   最关键的一句, private val controllerElector = new ZookeeperLeaderElector(controllerContext,

Apache Kafka源码分析 – Log Management

LogManager LogManager会管理broker上所有的logs(在一个log目录下),一个topic的一个partition对应于一个log(一个log子目录) 首先loadLogs会加载每个partition所对应的log对象, 然后提供createLog,getLog,deleteLog之类的管理接口 并且会创建些后台线程来进行,cleanup,flush,checkpoint生成之类的工作 Log Log只是对于LogSegments的封装,包含loadSegments,ap

Apache Kafka源码分析 - KafkaApis

kafka apis反映出kafka broker server可以提供哪些服务, broker server主要和producer,consumer,controller有交互,搞清这些api就清楚了broker server的所有行为   handleOffsetRequest 提供对offset的查询的需求,比如查询earliest,latest offset是什么,或before某个时间戳的offset是什么 try { // ensure leader exists // 确定是否是l

Kafka源码分析之Sender

        Sender为处理发送produce请求至Kafka集群的后台线程.这个线程更新集群元数据,然后发送produce请求至适当的节点.         首先,我们先看下它的成员变量: /* the state of each nodes connection */ // 每个节点连接的状态KafkaClient实例client private final KafkaClient client; /* the record accumulator that batches recor

Kafka源码分析之RecordBatch

        RecordBatch是Kafka中Producer中对批量记录的一个封装,它表示正在或将要被发送的一批记录.这个类不是线程安全的,当修改它时必须使用外部同步.RecordBatch中的成员变量如下: // 记录数目recordCount public int recordCount = 0; // 最大记录大小maxRecordSize public int maxRecordSize = 0; // 尝试次数attempts public volatile int attem

Apache Kafka源码分析 - autoLeaderRebalanceEnable

在broker的配置中,auto.leader.rebalance.enable (false) 那么这个leader是如何进行rebalance的? 首先在controller启动的时候会打开一个scheduler, if (config.autoLeaderRebalanceEnable) { //如果打开outoLeaderRebalance,需要把partiton leader由于dead而发生迁徙的,重新迁徙回去 info("starting the partition rebalan

Kafka源码分析之RecordAccumulator

        RecordAccumulator作为一个队列,累积记录records到MemoryRecords实例,然后被发送到服务器server.其成员变量如下: // RecordAccumulator是否关闭的标志位closed private volatile boolean closed; // 索引号drainIndex private int drainIndex; // flushes过程计数器flushesInProgress private final AtomicInt