kafka - advertised.listeners and listeners

listeners,

Listener List - Comma-separated list of URIs we will listen on and their protocols. 
Specify hostname as 0.0.0.0 to bind to all interfaces. 
Leave hostname empty to bind to default interface. 
Examples of legal listener lists: PLAINTEXT://myhost:9092,TRACE://:9091 PLAINTEXT://0.0.0.0:9092, TRACE://localhost:9093

 

advertised.listeners,

Listeners to publish to ZooKeeper for clients to use, if different than the listeners above. 
In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, the value for `listeners` will be used.

 

listeners

是kafka真正bind的地址,

/**
 * An NIO socket server. The threading model is
 *   1 Acceptor thread that handles new connections
 *   Acceptor has N Processor threads that each have their own selector and read requests from sockets
 *   M Handler threads that handle requests and produce responses back to the processor threads for writing.
 */
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time) extends Logging with KafkaMetricsGroup {

  private val endpoints = config.listeners

  /**
   * Start the socket server
   */
  def startup() {

      endpoints.values.foreach { endpoint =>
        val protocol = endpoint.protocolType
        val processorEndIndex = processorBeginIndex + numProcessorThreads

        for (i <- processorBeginIndex until processorEndIndex)
          processors(i) = newProcessor(i, connectionQuotas, protocol)

        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
        acceptors.put(endpoint, acceptor)
        Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
        acceptor.awaitStartup()

        processorBeginIndex = processorEndIndex
      }
  }

在socketServer中,可以看到,确实在SocketServer中accept的是listeners

为每个endpoint都建立acceptor和processer

 

advertised.listeners

是暴露给外部的listeners,如果没有设置,会用listeners

KafkaServer.startup

        /* tell everyone we are alive */
        val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>
          if (endpoint.port == 0)
            (protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType))
          else
            (protocol, endpoint)
        }
        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
          config.interBrokerProtocolVersion)
        kafkaHealthcheck.startup()

这里读出advertisedListeners,传给KafkaHealthcheck

/**
 * This class registers the broker in zookeeper to allow
 * other brokers and consumers to detect failures. It uses an ephemeral znode with the path:
 *   /brokers/ids/[0...N] --> advertisedHost:advertisedPort
 *
 * Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise
 * we are dead.
 */
class KafkaHealthcheck(brokerId: Int,
                       advertisedEndpoints: Map[SecurityProtocol, EndPoint],
                       zkUtils: ZkUtils,
                       rack: Option[String],
                       interBrokerProtocolVersion: ApiVersion) extends Logging {

像注释你们看到的,

KafkaHealthcheck就是把broker信息注册到zk里面的ephemeral znode,然后当znode消失就知道broker挂了

所以这里注册到zk中的一定是advertisedListeners

/**
   * Register this broker as "alive" in zookeeper
   */
  def register() {
    val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
    val updatedEndpoints = advertisedEndpoints.mapValues(endpoint =>
      if (endpoint.host == null || endpoint.host.trim.isEmpty)
        EndPoint(InetAddress.getLocalHost.getCanonicalHostName, endpoint.port, endpoint.protocolType) //如果没有host,默认读取InetAddress.getLocalHost.getCanonicalHostName
      else
        endpoint
    )

    // the default host and port are here for compatibility with older client
    // only PLAINTEXT is supported as default
    // if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered and older clients will be unable to connect
    val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null)) //生成plaintextEndpoint节点,兼容老版本
    zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort, rack, //新的版本只会读updatedEndpoints
      interBrokerProtocolVersion)
  }

 

 

问题是如果kafka间同步到底用的是什么listener,

ReplicaManager.makeFollowers

中会创建FetchThread,

        val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
          new TopicAndPartition(partition) -> BrokerAndInitialOffset(
            metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerSecurityProtocol),
            partition.getReplica().get.logEndOffset.messageOffset)).toMap
        replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)

这个逻辑是,broker间做同步的时候,创建FetchThread时的情况,

可以看到,broker信息还是从metadataCache取到的,

从metadataCache取出相应的broker,然后调用getBrokerEndPoint(config.interBrokerSecurityProtocol),取到相应的endpoint

security.inter.broker.protocol,Security protocol used to communicate between brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.

 

而用户拿到的broker信息,

KafkaApis.handleTopicMetadataRequest

val brokers = metadataCache.getAliveBrokers

    val responseBody = new MetadataResponse(
      brokers.map(_.getNode(request.securityProtocol)).asJava,
      metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
      completeTopicMetadata.asJava,
      requestVersion
    )

这里取决于什么安全协议,request.securityProtocol

public enum SecurityProtocol {
    /** Un-authenticated, non-encrypted channel */
    PLAINTEXT(0, "PLAINTEXT", false),
    /** SSL channel */
    SSL(1, "SSL", false),
    /** SASL authenticated, non-encrypted channel */
    SASL_PLAINTEXT(2, "SASL_PLAINTEXT", false),
    /** SASL authenticated, SSL channel */
    SASL_SSL(3, "SASL_SSL", false),
    /** Currently identical to PLAINTEXT and used for testing only. We may implement extra instrumentation when testing channel code. */
    TRACE(Short.MAX_VALUE, "TRACE", true);

可以看到不同的协议,可以有不同的地址

 

Broker

/**
    * Create a broker object from id and JSON string.
    *
    * @param id
    * @param brokerInfoString
    *
    * Version 1 JSON schema for a broker is:
    * {
    *   "version":1,
    *   "host":"localhost",
    *   "port":9092
    *   "jmx_port":9999,
    *   "timestamp":"2233345666"
    * }
    *
    * Version 2 JSON schema for a broker is:
    * {
    *   "version":2,
    *   "host":"localhost",
    *   "port":9092
    *   "jmx_port":9999,
    *   "timestamp":"2233345666",
    *   "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"]
    * }
    *
    * Version 3 (current) JSON schema for a broker is:
    * {
    *   "version":3,
    *   "host":"localhost",
    *   "port":9092
    *   "jmx_port":9999,
    *   "timestamp":"2233345666",
    *   "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"],
    *   "rack":"dc1"
    * }
    */
  def createBroker(id: Int, brokerInfoString: String): Broker = {
    if (brokerInfoString == null)
      throw new BrokerNotAvailableException(s"Broker id $id does not exist")
    try {
      Json.parseFull(brokerInfoString) match {
        case Some(m) =>
          val brokerInfo = m.asInstanceOf[Map[String, Any]]
          val version = brokerInfo("version").asInstanceOf[Int]
          val endpoints =
            if (version < 1)
              throw new KafkaException(s"Unsupported version of broker registration: $brokerInfoString")
            else if (version == 1) {
              val host = brokerInfo("host").asInstanceOf[String]
              val port = brokerInfo("port").asInstanceOf[Int]
              Map(SecurityProtocol.PLAINTEXT -> new EndPoint(host, port, SecurityProtocol.PLAINTEXT))
            }
            else {
              val listeners = brokerInfo("endpoints").asInstanceOf[List[String]]
              listeners.map { listener =>
                val ep = EndPoint.createEndPoint(listener)
                (ep.protocolType, ep)
              }.toMap
            }
          val rack = brokerInfo.get("rack").filter(_ != null).map(_.asInstanceOf[String])
          new Broker(id, endpoints, rack)
        case None =>
          throw new BrokerNotAvailableException(s"Broker id $id does not exist")
      }
    } catch {
      case t: Throwable =>
        throw new KafkaException(s"Failed to parse the broker info from zookeeper: $brokerInfoString", t)
    }
  }
}

可以看到,老版本的是用host,port

而新版本都是用endpoints,里面可以定义各种协议下的listeners

 

zkUtil

/**
   * This API takes in a broker id, queries zookeeper for the broker metadata and returns the metadata for that broker
   * or throws an exception if the broker dies before the query to zookeeper finishes
   *
   * @param brokerId The broker id
   * @return An optional Broker object encapsulating the broker metadata
   */
  def getBrokerInfo(brokerId: Int): Option[Broker] = {
    readDataMaybeNull(BrokerIdsPath + "/" + brokerId)._1 match {
      case Some(brokerInfo) => Some(Broker.createBroker(brokerId, brokerInfo))
      case None => None
    }
  }

zkUtil只是读出zk中相应的内容并createBroker

 

结论,

listeners,用于server真正bind

advertisedListeners, 用于开发给用户,如果没有设定,直接使用listeners

 

当前kafka没有区分内外部的流量,一旦设置advertisedListeners,所有流量都会使用这个配置,明显不合理啊

https://cwiki.apache.org/confluence/display/KAFKA/KIP-103%3A+Separation+of+Internal+and+External+traffic

会解决这个问题

时间: 2025-01-19 20:22:55

kafka - advertised.listeners and listeners的相关文章

部署kafka到服务器

下载好安装包以后将其拷贝到/usr/local/目录下. 部署步骤: 解压 tar -zxvf kafka.... 更改文件名 mv kafka ... kafka 进入kafka目录 cd kafka 创建数据存储目录 mkdir data 编辑配置文件 vim conf/server.properties 编写log.dirs=/usr/local/kafka/data/kafka-logs 编写listeners=PLAINTEXT://:9092 :q 退出vim 启动kafka自带的z

android-Android 自己写的一个全局事件管理器,感觉还有点儿问题,提上代码,希望大神们修改修改!!!

问题描述 Android 自己写的一个全局事件管理器,感觉还有点儿问题,提上代码,希望大神们修改修改!!! import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import android.os

运行时和编译时元编程—编译时元编程

原文链接    译文链接     译者:JackWang 运行时和编译时元编程 第二部分 2 编译时元编程 Groovy的编译时元编程支持编译时生成代码.这些变换(译者注:原文该专有名词是transformations,译者直译为变换,也许不准确.如果有知道准确翻译的读者恳请不吝赐教,待译者修正)叫做程序的抽象语法树(AST),在Groovy里,我们叫做AST变换.AST变换支持在编译过程中植入钩子,修改抽象语法树之后继续编译生成正常的字节码流.和运行时元编程相比,这种转换可以在类文件的修改可见

使用iWebOffice实现电子签章

摘要:随着信息化的发展电子签章已经越来越多的被用到很多OA系统中,今天就来看一下如何使用iWebOffice来实现电子签章功能. 内容: 1.iWebOffice2003的基本原理 2.使用iWebOffice2003实现电子签章 一.iWebOffice2003的基本原理 在开始今天的主题之前先简单的说一下iWebOffice的原理.iWebOffice控件由两部分组成:一个是用于集成在页面上的iWebOffice2003.ocx文件,另一个是运行在后台服务器上的iMsgServer2000.

Angularjs 源码分析3

本文接着上一篇讲 回顾 上次说到了rootScope里的$watch方法中的解析监控表达式,即而引出了对parse的分析,今天我们接着这里继续挖代码. $watch续 先上一块$watch代码 $watch: function(watchExp, listener, objectEquality) { var scope = this, get = compileToFn(watchExp, 'watch'), array = scope.$$watchers, watcher = { fn:

如何使用 Java8 实现观察者模式?(下)

[编者按]本文作者是 BAE 系统公司的软件工程师 Justin Albano.在本篇文章中,作者通过在 Java8 环境下实现观察者模式的实例,进一步介绍了什么是观察者模式.专业化及其命名规则,供大家参考学习.本文系国内 ITOM 管理平台 OneAPM 工程师编译整理. 线程安全的实现 前面章节介绍了在现代Java环境下的实现观察者模式,虽然简单但很完整,但这一实现忽略了一个关键性问题:线程安全.大多数开放的Java应用都是多线程的,而且观察者模式也多用于多线程或异步系统.例如,如果外部服务

Android MVVM(使用经验篇)

      MVVM的大名相信做手机开发的肯定不会陌生,我第一次听到它是从做IOS开发的同学那里听到的,我们的项目之前应用了MVP,要说服大家从MVP到MVVM,肯定得说说为啥,他优秀在那里?       首先我们看看正常MVP的依赖关系图:       这是个经典的MVP依赖关系,View 层和Presenter,Presenter和Model层彼此依赖,但是不会出现MVC那种跨层依赖,例如如果你写出来的View和Model层有依赖的话,那么就不是正常的MVP结构咯~这个结构好处很明显,Mod

php EventManager Module

参考zf2的EventManager,结合实际项目中的需要写了一个简易的EventManager,主要功能有: 订阅者模式: 拦截器: 事件驱动 具体概念就不介绍了,先来看看模块中的几个类或接口. EventManager是核心模块,主要负责为监听事件,添加拦截器,触发事件,下面看一个例子: Java代码   <?php   require 'EventManager.php';   $em = new EventManager();   $em->attach('start', functi

高手快来啊,关于方法级别的事件订阅

问题描述 高手快来啊,关于方法级别的事件订阅 大家好,现在有这么一个需求,请教一下有没有好的实现办法. 需求描述: 1.ClassA产生事件,可能有多种类型,例如:订单创建.订单付款.订单评价...等. 2.有许多类需要知道ClassA产生的事件中的一种或多种类型.例如:ClassB想知道ClassA产生的订单创建.订单评价这两个事件. 3.应该怎么实现?我现在的做法是: Class A { List<ClassAListener> listeners = new ArrayList<&