【Kafka源码】ReplicaManager启动过程

在KafkaServer启动过程的入口中,会启动Replica Manager,众所周知,这是一个副本管理器。replica在Kafka中扮演的角色很重要,是保证消息不丢失的一个重要概念。

replica的个人理解概念如下:producer发送的消息给broker,broker是分为多个partition的,对于同一个partition中的broker,这些机器是有主从的概念的。producer只会向leader写入消息,consumer只会从leader读取消息,(leader负责读写,replica保证消息不丢)。为了保证消息不丢失,follower会定时从leader拉取消息,保持与leader的消息同步。当然,producer可以配置是否需要有follower同步成功,以及需要多少个replica,(即需要多少个ack)才算是消息发送成功。这块看个人的需求。

下面我们看下Replica Manager的启动过程。

一、入口

入口在KafkaServer的start方法中,比较简洁:

replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,isShuttingDown)
replicaManager.startup()

我们主要看下ReplicaManager里面都有什么内容。

二、ReplicaManager实例化

我们看看实例化的过程:

/ epoch of the controller that last changed the leader /
@volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
private val localBrokerId = config.brokerId
private val allPartitions = new Pool[(String, Int), Partition](valueFactory = Some { case (t, p) =>
  new Partition(t, p, time, this)
})
private val replicaStateChangeLock = new Object
val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, jTime, threadNamePrefix)
private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
private var hwThreadInitialized = false
this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
val stateChangeLogger = KafkaController.stateChangeLogger
private val isrChangeSet: mutable.Set[TopicAndPartition] = new mutable.HashSet[TopicAndPartition]()
private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())

val delayedProducePurgatory = DelayedOperationPurgatory[DelayedProduce](
    purgatoryName = "Produce", config.brokerId, config.producerPurgatoryPurgeIntervalRequests)
val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch](
    purgatoryName = "Fetch", config.brokerId, config.fetchPurgatoryPurgeIntervalRequests)
  • 首先是这个controllerEpoch,这个值表示的是leader发生变化时controller的epoch。epoch存储在zk中的/Controller_epoch中。
  • 第二步是从配置broker.id中获取当前机器的brokerId。
  • 实例化ReplicaFetcherManager,是一个follower从leader拉取消息的管理器,这里面有文章。
  • 设置highWatermarkCheckPointThreadStarted为false,为了后续启动相关的线程用。
  • 从文件(replication-offset-checkpoint)中获取所有topic和partition的HW,这个文件中存储了每个topic和partition对应的最新的checkPoint对应的offset值。HW表示的是topic的partition对应的最后一次commit的消息的offset值,也是用于消息完整性的保证。
  • 定义了isrChangerSet,表示了isr改变顺序的集合。至于isr是干啥的,网上的内容比较多,搜索即可。
  • 最后涉及到两个配置,分别是:
    • producer.purgatory.purge.interval.requests:默认值1000,用于在procucer的ack设置是-1或者1时,跟踪消息是否添加成功,使用DelayedProduce实现。成功后清除。
    • fetch.purgatory.purge.interval.requests:默认值1000,fetch 请求清除时的清除间隔

三、启动ReplicaManager

我们主要看下ReplicaManager的start方法:

def startup() {
// start ISR expiration thread
scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS)
scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS)
}

这块主要启动了两个定时任务,分别是maybeShrinkIsr和maybePropagateIsrChanges。下面我们着重分析下。

3.1 maybeShrinkIsr

这个方法的调用时间间隔由配置replica.lag.time.max.ms控制,主要用于检查partition对应的isr列表中是否有心跳过期的isr。

  private def maybeShrinkIsr(): Unit = {
    trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
    allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs))
  }

这块主要是遍历了所有的partition,每个partition都执行maybeShrinkIsr方法,下面我们进入maybeShrinkIsr,分析下主要做了哪些事情。

  def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
    val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
      leaderReplicaIfLocal() match {
        case Some(leaderReplica) =>
          val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
          if(outOfSyncReplicas.size > 0) {
            val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
            assert(newInSyncReplicas.size > 0)
            info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId,
              inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
            // update ISR in zk and in cache
            updateIsr(newInSyncReplicas)
            // we may need to increment high watermark since ISR could be down to 1

            replicaManager.isrShrinkRate.mark()
            maybeIncrementLeaderHW(leaderReplica)
          } else {
            false
          }

        case None => false // do nothing if no longer leader
      }
    }

整个步骤如下:

  • leaderReplicaIfLocal:先检查当前的partition的leader是否为当前的broker,如果为是,就不进入下面的方法,否则进入下面的方法。
  • getOutOfSyncReplicas:获取不同步的replica列表,获取的方法是首先从isr中去除掉leader,然后把当前时间-lastCaughtUpTimeMs大于replicaMaxLagTimeMs的replica筛选出来,即为outOfSyncReplicas。这里面的lastCaughtUpTimeMs是指上次同步的时间,不一定是心跳时间。
  • 如果outOfSyncReplicas中存在replica,则继续。两个列表进行差值运算后得到新的isr列表,之后更新isr列表(即zk中的数据)。
  • 最后可能需要更新下HW

3.2 maybePropagateIsrChanges

这个方法的调用时间是固定的,不由配置决定,代码中写死,为2500ms。这个方法会把isr的变化内容更新到zk中去,执行这个方法的条件是:

  • ISR变化没有被广播出去
  • 最近5s内没有ISR变化或者上次广播的时间距离当前时间超过了60s,其实这里的广播就是指写入到zk中
  def maybePropagateIsrChanges() {
    val now = System.currentTimeMillis()
    isrChangeSet synchronized {
      if (isrChangeSet.nonEmpty &&
        (lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now ||
          lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
        ReplicationUtils.propagateIsrChanges(zkUtils, isrChangeSet)
        isrChangeSet.clear()
        lastIsrPropagationMs.set(now)
      }
    }
  }
时间: 2024-11-28 21:45:58

【Kafka源码】ReplicaManager启动过程的相关文章

Nginx学习笔记(六) 源码分析&amp;启动过程

涉及到的基本函数 源码: 1 /* 2 * Copyright (C) Igor Sysoev 3 * Copyright (C) Nginx, Inc. 4 */ 5 6 7 #include <ngx_config.h> 8 #include <ngx_core.h> 9 #include <nginx.h> 10 11 12 static ngx_int_t ngx_add_inherited_sockets(ngx_cycle_t *cycle); 13 sta

Kafka源码环境搭建

本文主要讲述的是如何搭建Kafka的源码环境,主要针对的Windows操作系统下IntelliJ IDEA编译器,其余操作系统或者IDE可以类推. 1.安装和配置JDK 确认JDK版本至少为1.7,最好是1.8及以上.使用java -version命令来查看当前JDK的版本,示例如下: C:\Users\hidden> java -version java version "1.8.0_112" Java(TM) SE Runtime Environment (build 1.8

Apache Kafka源码分析 – Broker Server

1. Kafka.scala 在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装 1: val kafkaServerStartble = new KafkaServerStartable(serverConfig) 2: kafkaServerStartble.startup   1: package kafka.server 2: class KafkaServerStartab

【Kafka源码】Kafka代码模块

Kafka源码依赖于Scala环境,首先需要安装scala,这块请自行百度进行安装. 传送门 当然,我们要分析源码,需要下载源码,请自行从github上面下载. 说明:本文使用的kafka版本为0.10.0.1,这是目前公司使用的版本. 下面说明下kafka源码的工程结构: 下面主要对core目录模块进行说明,这块是kafka的核心. admin:管理员模块,操作和管理topic,paritions相关,包含create,delete topic,扩展patitions api:这块主要负责数据

【Kafka源码】Kafka启动过程

一般来说,我们是通过命令来启动kafka,但是命令的本质还是调用代码中的main方法,所以,我们重点看下启动类Kafka.源码下下来之后,我们也可以通过直接运行Kafka.scala中的main方法(需要指定启动参数,也就是server.properties的位置)来启动Kafka.因为kafka依赖zookeeper,所以我们需要提前启动zookeeper,然后在server.properties中指定zk地址后,启动. 下面我们首先看一下main()方法: def main(args: Ar

【Kafka源码】KafkaController启动过程

[TOC] 之前聊过了很多Kafka启动过程中的一些加载内容,也知道了broker可以分为很多的partition,每个partition内部也可以分为leader和follower,主从之间有数据的复制.那么这么多partition是谁在管理?broker内部有没有主从之分?这就是本文的主角,KafkaController,本文将细细道来. 一.入口 KafkaController的启动入口同样很简洁,在KafkaServer的start方法中. / start kafka controlle

【Kafka源码】日志处理

目前来说,kafka的日志中记录的内容比较多,具体的存储内容见这篇博客,写的比较好.可以看到,存储的内容还是比较多的,当存储文件比较大的时候,我们应该如何处理这些日志?下面我们通过kafka启动过程的源码,分析下kafka的日志处理过程. 一.入口方法 在kafkaServer.scala中的start方法中,有一个这样的调用: / start log manager / logManager = createLogManager(zkUtils.zkClient, brokerState) l

MySQL · 源码分析 · SHUTDOWN过程

ORACLE 中的SHUTDOWN MySQL SHUTDOWN LEVEL 暂时只有一种,源码中留了 LEVEL 的坑还没填 在此借用 Oracle 的 SHUTDOWN LEVEL 分析 Oracle SHUTDOWN LEVEL 共有四种:ABORT.IMMEDIATE.NORMAL.TRANSACTIONAL ABORT 立即结束所有SQL 回滚未提交事务 断开所有用户连接 下次启动实例时,需要recovery IMMEDIATE 允许正在运行的SQL执行完毕 回滚未提交事务 断开所有用

TOMCAT源码分析——启动服务

前言 熟悉Tomcat的工程师们,肯定都知道Tomcat是如何启动与停止的.对于startup.sh.startup.bat.shutdown.sh.shutdown.bat等脚本或者批处理命令,大家一定知道改如何使用它,但是它们究竟是如何实现的,尤其是shutdown.sh脚本(或者shutdown.bat)究竟是如何和Tomcat进程通信的呢?本文将通过对Tomcat7.0的源码阅读,深入剖析这一过程. 由于在生产环境中,Tomcat一般部署在Linux系统下,所以本文将以startup.s