【Kafka源码】Kafka启动过程

一般来说,我们是通过命令来启动kafka,但是命令的本质还是调用代码中的main方法,所以,我们重点看下启动类Kafka。源码下下来之后,我们也可以通过直接运行Kafka.scala中的main方法(需要指定启动参数,也就是server.properties的位置)来启动Kafka。因为kafka依赖zookeeper,所以我们需要提前启动zookeeper,然后在server.properties中指定zk地址后,启动。

下面我们首先看一下main()方法:

def main(args: Array[String]): Unit = {
try {
val serverProps = getPropsFromArgs(args)
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

  // attach shutdown handler to catch control-c
  Runtime.getRuntime().addShutdownHook(new Thread() {
    override def run() = {
      kafkaServerStartable.shutdown
    }
  })

  kafkaServerStartable.startup
  kafkaServerStartable.awaitShutdown
}
catch {
  case e: Throwable =>
    fatal(e)
    System.exit(1)
}
System.exit(0)

}
我们慢慢来分析下,首先是getPropsFromArgs(args),这一行很明确,就是从配置文件中读取我们配置的内容,然后赋值给serverProps。第二步,KafkaServerStartable.fromProps(serverProps),

object KafkaServerStartable {
def fromProps(serverProps: Properties) = {
KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))
new KafkaServerStartable(KafkaConfig.fromProps(serverProps))
}
}
这块主要是启动了一个内部的监控服务(内部状态监控)。

下面是一个在java中常见的钩子函数,在关闭时会启动一些销毁程序,保证程序安全关闭。之后就是我们启动的重头戏了:kafkaServerStartable.startup。跟进去可以很清楚的看到,里面调用的方法是KafkaServer中的startup方法,下面我们重点看下这个方法(比较长):

def startup() {
try {
info("starting")

  if(isShuttingDown.get)
    throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")

  if(startupComplete.get)
    return

  val canStartup = isStartingUp.compareAndSet(false, true)
  if (canStartup) {
    metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)

    brokerState.newState(Starting)

    / start scheduler /
    kafkaScheduler.startup()

    / setup zookeeper /
    zkUtils = initZk()

    / start log manager /
    logManager = createLogManager(zkUtils.zkClient, brokerState)
    logManager.startup()

    / generate brokerId /
    config.brokerId =  getBrokerId
    this.logIdent = "[Kafka Server " + config.brokerId + "], "

    socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
    socketServer.startup()

    / start replica manager /
    replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
      isShuttingDown)
    replicaManager.startup()

    / start kafka controller /
    kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
    kafkaController.startup()

    / start group coordinator /
    groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, kafkaMetricsTime)
    groupCoordinator.startup()

    / Get the authorizer and initialize it if one is specified./
    authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
      val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
      authZ.configure(config.originals())
      authZ
    }

    / start processing requests /
    apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator,
      kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
    requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
    brokerState.newState(RunningAsBroker)

    Mx4jLoader.maybeLoad()

    / start dynamic config manager /
    dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config),
                                                       ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers))

    // Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides
    // TODO: Move this logic to DynamicConfigManager
    AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach {
      case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties)
    }

    // Create the config manager. start listening to notifications
    dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
    dynamicConfigManager.startup()

    / tell everyone we are alive /
    val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>
      if (endpoint.port == 0)
        (protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType))
      else
        (protocol, endpoint)
    }
    kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
      config.interBrokerProtocolVersion)
    kafkaHealthcheck.startup()

    // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
    checkpointBrokerId(config.brokerId)

    / register broker metrics /
    registerStats()

    shutdownLatch = new CountDownLatch(1)
    startupComplete.set(true)
    isStartingUp.set(false)
    AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
    info("started")
  }
}
catch {
  case e: Throwable =>
    fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
    isStartingUp.set(false)
    shutdown()
    throw e
}

}
首先判断是否目前正在关闭中或者已经启动了,这两种情况直接抛出异常。然后是一个CAS的操作isStartingUp,防止线程并发操作启动,判断是否可以启动。如果可以启动,就开始我们的启动过程。

构造Metrics类
定义broker状态为启动中starting
启动定时器kafkaScheduler.startup()
构造zkUtils:利用参数中的zk信息,启动一个zk客户端
启动文件管理器:读取zk中的配置信息,包含__consumer_offsets和__system.topic__。重点是启动一些定时任务,来删除符合条件的记录(cleanupLogs),清理脏记录(flushDirtyLogs),把所有记录写到一个文本文件中,防止在启动时重启所有的记录文件(checkpointRecoveryPointOffsets)。
/**

  • Start the background threads to flush logs and do log cleanup
    /
    def startup() {
    /
    Schedule the cleanup task to delete old logs */
    if(scheduler != null) {
    info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
    scheduler.schedule("kafka-log-retention",
    cleanupLogs,
    delay = InitialTaskDelayMs,
    period = retentionCheckMs,
    TimeUnit.MILLISECONDS)
    info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
    scheduler.schedule("kafka-log-flusher",
    flushDirtyLogs,
    delay = InitialTaskDelayMs,
    period = flushCheckMs,
    TimeUnit.MILLISECONDS)
    scheduler.schedule("kafka-recovery-point-checkpoint",
    checkpointRecoveryPointOffsets,
    delay = InitialTaskDelayMs,
    period = flushCheckpointMs,
    TimeUnit.MILLISECONDS)
    }
    if(cleanerConfig.enableCleaner)
    cleaner.startup()
    }
    下一步,获取brokerId
    启动一个NIO socket服务
    启动复制管理器:启动ISR超时处理线程
    启动kafka控制器:注册session过期监听器,同时启动控制器leader选举
    启动协调器
    权限认证
    开启线程,开始处理请求
    开启配置监听,主要是监听zk节点数据变化,然后广播到所有机器
    开启健康检查:目前只是把broker节点注册到zk上,注册成功就是活的,否则就是dead
    注册启动数据信息
    启动成功
    等待关闭countDownLatch,如果shutdownLatch变为0,则关闭Kafka
时间: 2024-09-13 00:24:19

【Kafka源码】Kafka启动过程的相关文章

Nginx学习笔记(六) 源码分析&启动过程

涉及到的基本函数 源码: 1 /* 2 * Copyright (C) Igor Sysoev 3 * Copyright (C) Nginx, Inc. 4 */ 5 6 7 #include <ngx_config.h> 8 #include <ngx_core.h> 9 #include <nginx.h> 10 11 12 static ngx_int_t ngx_add_inherited_sockets(ngx_cycle_t *cycle); 13 sta

Kafka源码环境搭建

本文主要讲述的是如何搭建Kafka的源码环境,主要针对的Windows操作系统下IntelliJ IDEA编译器,其余操作系统或者IDE可以类推. 1.安装和配置JDK 确认JDK版本至少为1.7,最好是1.8及以上.使用java -version命令来查看当前JDK的版本,示例如下: C:\Users\hidden> java -version java version "1.8.0_112" Java(TM) SE Runtime Environment (build 1.8

Apache Kafka源码分析 – Broker Server

1. Kafka.scala 在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装 1: val kafkaServerStartble = new KafkaServerStartable(serverConfig) 2: kafkaServerStartble.startup   1: package kafka.server 2: class KafkaServerStartab

【Kafka源码】Kafka代码模块

Kafka源码依赖于Scala环境,首先需要安装scala,这块请自行百度进行安装. 传送门 当然,我们要分析源码,需要下载源码,请自行从github上面下载. 说明:本文使用的kafka版本为0.10.0.1,这是目前公司使用的版本. 下面说明下kafka源码的工程结构: 下面主要对core目录模块进行说明,这块是kafka的核心. admin:管理员模块,操作和管理topic,paritions相关,包含create,delete topic,扩展patitions api:这块主要负责数据

【Kafka源码】KafkaController启动过程

[TOC] 之前聊过了很多Kafka启动过程中的一些加载内容,也知道了broker可以分为很多的partition,每个partition内部也可以分为leader和follower,主从之间有数据的复制.那么这么多partition是谁在管理?broker内部有没有主从之分?这就是本文的主角,KafkaController,本文将细细道来. 一.入口 KafkaController的启动入口同样很简洁,在KafkaServer的start方法中. / start kafka controlle

【Kafka源码】ReplicaManager启动过程

在KafkaServer启动过程的入口中,会启动Replica Manager,众所周知,这是一个副本管理器.replica在Kafka中扮演的角色很重要,是保证消息不丢失的一个重要概念. replica的个人理解概念如下:producer发送的消息给broker,broker是分为多个partition的,对于同一个partition中的broker,这些机器是有主从的概念的.producer只会向leader写入消息,consumer只会从leader读取消息,(leader负责读写,rep

【Kafka源码】日志处理

目前来说,kafka的日志中记录的内容比较多,具体的存储内容见这篇博客,写的比较好.可以看到,存储的内容还是比较多的,当存储文件比较大的时候,我们应该如何处理这些日志?下面我们通过kafka启动过程的源码,分析下kafka的日志处理过程. 一.入口方法 在kafkaServer.scala中的start方法中,有一个这样的调用: / start log manager / logManager = createLogManager(zkUtils.zkClient, brokerState) l

Apache Kafka源码分析 - kafka controller

前面已经分析过kafka server的启动过程,以及server所能处理的所有的request,即KafkaApis  剩下的,其实关键就是controller,以及partition和replica的状态机  这里先看看controller在broker server的基础上,多做了哪些初始化和failover的工作   最关键的一句, private val controllerElector = new ZookeeperLeaderElector(controllerContext,

MySQL · 源码分析 · SHUTDOWN过程

ORACLE 中的SHUTDOWN MySQL SHUTDOWN LEVEL 暂时只有一种,源码中留了 LEVEL 的坑还没填 在此借用 Oracle 的 SHUTDOWN LEVEL 分析 Oracle SHUTDOWN LEVEL 共有四种:ABORT.IMMEDIATE.NORMAL.TRANSACTIONAL ABORT 立即结束所有SQL 回滚未提交事务 断开所有用户连接 下次启动实例时,需要recovery IMMEDIATE 允许正在运行的SQL执行完毕 回滚未提交事务 断开所有用