《KAFKA官方文档》入门指南(五)

新的协议版本

  • ListOffsetRequest V1支持精确的基于时间戳的偏移搜索。
  • MetadataResponse V2引入了一个新的参数: “CLUSTER_ID”。
  • FetchRequest v3支持限制请求返回的大小(除了现有的每个分区的限制),它能够返回比限制更大的消息和在请求中加入分区的顺序具有重要意义。
  • JoinGroup V1引入了一个新的字段: “rebalance_timeout”。

升级0.8.40.9.x版本到0.10.0.0

0.10.0.0具有的潜在的重大更改(请在升级前仔细检查更改)和 在升级后的性能影响。通过下面的推荐滚动升级计划,能保证不宕机,不影响性能和随后的升级。
注意:由于新协议的引入,升级客户端之前升级您的Kafka集群是很重要的。

注意0.9.0.0版本的客户端:由于0.9.0.0引入了一个错误,即依赖于ZooKeeper的客户(老Scala高层次消费者和与老消费者一起使用的MirrorMaker)不能和0.10.0.x代理一起工作。因此,代理都升级到0.10.0.x之前, 0.9.0.0客户端应升级到0.9.0.1 . 这一步对0.8.4或0.9.0.1客户端没有必要。

对于滚动升级:

  1. 更新所有代理服务器的server.properties文件,并添加以下属性:

    • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2或0.9.0.0)。
    • log.message.format.version = CURRENT_KAFKA_VERSION(参见升级后的潜在性能的影响对于此配置做什么的详细信息。)
  2. 升级代理。这可以通过简单地将其关机,更新代码,并重新启动实现。
  3. 一旦整个群集升级结束,通过编辑inter.broker.protocol.version并将其设置为0.10.0.0的协议版本。注意:您不应该修改log.message.format.version — 这个参数只能在所有的消费者都已经升级到0.10.0.0之后再修改。
  4. 逐一重新启动代理,新协议版本生效。
  5. 一旦所有的消费者都已经升级到0.10.0,逐一修改log.message.format.version至0.10.0和重启代理服务器。

注意:如果你愿意接受宕机,你可以简单地把所有的代理服务器关闭,更新代码,然后重新启动他们。他们将默认使用新的协议。

注:改变协议版本并重新启动可以在代理服务器升级之后的任何时间做,没有必要必须立刻就做。

升级到0.10.0.0带来的潜在的性能影响

0.10.0消息格式包括一个新的时间戳字段,并对压缩的消息使用相对偏移。磁盘上的消息格式可以通过在server.properties文件的log.message.format.version进行配置。默认的磁盘上的消息格式为0.10.0。如果消费者客户端的版本是0.10.0.0之前的版本,那它只能明白0.10.0之前的消息格式。在这种情况下,代理能够把消息从0.10.0格式转换到一个较早的格式再发送旧版本的响应给消费者。然而,代理不能在这种情况下使用零拷贝转移。Kafka社区报告显示性能的影响为CPU利用率从20%增加至将近100%,这迫使所有客户端的必须即时升级使性能恢复正常。为了避免这样的消息转换带来的性能问题,消费者升级到0.10.0.0之前,在升级代理到0.10.0.0的过程中设置log.message.format.version到0.8.2或0.9.0。这样一来,代理仍然可以使用零拷贝传输,将数据发送到老消费者。一旦消费者升级完成,消息格式更改为0.10.0,这样代理就可以享受新的消息格式包括新的时间戳和改进的压缩算法。这种转换可以支持兼容性,对只有几个还没有更新到最新客户端的应用程序非常有用,但不切实际的是使用一个过度使用的集群中去支持所有消费者的流量。因此,当代理已经升级,但大多数客户端还没有完成升级的情况,要尽可能避免使用这种信息转换。

对于升级到0.10.0.0客户,没有性能影响。

注:设置消息格式版本是一个证明,现有的所有支持的消息都在这个版本或低于该消息格式的版本。否则, 0.10.0.0之前的消费者可能不能正常工作。特别是消息格式设置为0.10.0之后,不应该再改回先前的格式,因为它可能使得0.10.0.0之前的消费者工作异常。

注:由于每个消息中引入了另外的时间戳,生产者发送的消息大小比较小的时候因为额外的负载开销也许会看到吞吐量的下降。同样,副本的复制会让每个消息额外传输8个字节。如果你正在运行接近集群承载能力的网络容量,你可能会压垮网卡,由于超载而发生故障和性能问题。

注:如果您已对生产者启用压缩算法,您可能会注意到降低的生产者吞吐量和/或在某些情况下代理降低的压缩比。当接收到压缩的消息,0.10.0代理避免再次压缩消息,其通常降低了等待时间,并提高了吞吐量。在某些情况下,这可能会减少生产者批量消息包的大小,这可能导致更糟糕的吞吐量。如果发生这种情况,用户可以调整生产者的linger.ms和batch.size以获得更好的吞吐量。此外,用于高效压缩消息的生产者缓冲区比代理使用的缓冲区小,这可能对磁盘的压缩消息比率有负面的影响。我们打算在未来的Kafka版本中能够配置这些参数。

0.10.0.0潜在的重大更改

  • 从Kafka0.10.0.0开始,Kafka消息格式的版本被表示为Kafka版本。例如,消息格式0.9.0指通过Kafka0.9.0支持的最高消息版本。
  • 消息格式0.10.0已经推出,它是默认使用的版本。它引入了一个时间戳字段和相对偏移被用于压缩消息。
  • ProduceRequest /Response V2已经被引入,它在默认情况下支持消息格式0.10.0
  • FetchRequest /Response V2已经被引入,它在默认情况下支持消息格式0.10.0
  • MessageFormatter接口从def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)变为 def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)
  • MessageReader接口从def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]变为 def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]
  • MessageFormatter的包从kafka.tools改变为kafka.common
  • MessageReader的包从kafka.tools改变我kafka.common
  • MirrorMakerMessageHandler不再公开方法handle(record: MessageAndMetadata[Array[Byte], Array[Byte]]),因为它从来没有被调用。
  • 0.7 KafkaMigrationTool不再被打包进Kafka包。如果您需要从0.7迁移到0.10.0,请先迁移到0.8,然后再按照文档的升级过程升级0.8到0.10.0。
  • 新的消费者拥有标准化的API,接受java.util.Collection作为方法参数序列类型。现有的代码可能需要更新才能与0.10.0客户端库一起工作。
  • LZ4-compressed的消息处理被改变为使用可互操作的帧规范(LZ4f V1.5.1)。为了保持与旧客户端的兼容性,这一变化仅适用于消息格式0.10.0及更高版本。使用V0 / V1(消息格式0.9.0)的客户端应该继续使用0.9.0帧规范实现执行产生/抓取LZ4压缩消息。使用生产/获取协议v2或更高版本客户端应该使用互操作LZ4f帧规范。可互操作的LZ4库的列表,请参考http://www.lz4.org/

0.10.0.0显著的变化

  • 从Kafka0.10.0.0开始,新的客户端库Kafka可用于流处理存储在Kafka主题的数据。这个新的客户端库只适用于0.10.x及后面版本的代理。欲了解更多信息,请阅读流文件
  • 对新的消费者,配置参数receive.buffer.bytes的默认值现在是64k。
  • 新的消费者现在公开暴露配置参数exclude.internal.topics去限制内部主题(诸如消费者偏移主题),不让这些主题被偶然的包括在正则表达式的主题订阅中。默认情况下,它处于启用状态。
  • 老Scala生产者已被弃用。用户要尽快迁移他们的代码到Kafka客户端JAR里的Java生产者。
  • 新的消费者API已经被标记为稳定。

升级0.8.00.8.1.X0.8.2.X0.9.0.0

0.9.0.0具有的潜在的重大更改(请在升级前检查),还有以前的版本到现在的代理间协议的变化。这意味着升级的代理和客户端可能不兼容旧版本。您在升级您的客户端之前升级Kafka集群是很重要的。如果您正在使用MirrorMaker下游集群应该先升级为好。

对于滚动升级:

  1. 更新所有代理上的server.properties文件,并添加以下属性:inter.broker.protocol.version = 0.8.2.X
  2. 逐一升级的代理。可以通过简单地将其关闭,更新代码,并重新启动它实现。
  3. 一旦整个群集升级成功,通过编辑inter.broker.protocol.version并将其设置为0.9.0.0的协议版本。
  4. 逐一重新启动代理使新协议版本生效

注意:如果你愿意接受宕机,你可以简单地把所有的代理服务器关闭,更新代码,然后重新启动他们。他们将默认使用新的协议。

注:改变协议版本并重新启动可以在代理服务器升级之后的任何时间做,没有必要必须立刻就做。

0.9.0.0潜在的重大更改

  • Java 1.6不再支持。
  • Scala 2.9不再支持。
  • 1000以上的代理ID现在默认保留,用来做自动分配的代理ID。如果您的集群已存在高于阈值的经纪人的ID确保相应地增加reserved.broker.max.id代理配置属性。
  • 配置参数replica.lag.max.messages被删除。分区Leader将不再考虑滞后的消息数量来决定哪些副本是同步的,。
  • 配置参数replica.lag.time.max.ms现在不仅指从副本提取请求所花费的时间,也标识副本最后一次同步到现在经过的时间。那些副本仍然从领导者获取信息,但在replica.lag.time.max.ms时间内没有从leader最新消息的副本将被认为是不同步的。
  • 压缩主题不再接受没有主键消息和遇到这种情况生产者会抛出一个异常。在0.8.4,没有主键的消息会导致日志压缩线程退出(并停止所有压缩主题的处理)。
  • MirrorMaker不再支持多种目标集群。因此,它只能接受一个–consumer.config参数。要镜像多个源集群,则需要每个源集群至少一个MirrorMaker实例,每个都有自己的消费者配置。
  • 在包org.apache.kafka.clients.tools.*里的工具已移至org.apache.kafka.tools.*。所有的其中的脚本将仍然像往常一样起作用,只是直接导入这些类的自定义代码将受到影响。
  • 默认的KafkaJVM性能选项(KAFKA_JVM_PERFORMANCE_OPTS)已经在kafka-run-class.sh被改变。
  • 该kafka-topics.sh脚本(kafka.admin.TopicCommand)现在失败会返回非零退出代码。
  • 该kafka-topics.sh脚本(kafka.admin.TopicCommand)现在碰到由于使用“.” 或“_”的主题的名称将打印警告信息,以及在实际发生冲突的情况下打印错误信息。
  • 该kafka-console-producer.sh脚本(kafka.tools.ConsoleProducer)默认将使用Java生产者而不是旧的Scala生产者,并且用户必须指定“老生产者”使用旧版本的生产者。
  • 默认情况下,所有命令行工具将打印所有消息记录到stderr而不是stdout。

0.9.0.1的显着变化

  • 新的代理ID自动生成功能可以通过设置broker.id.generation.enable为false禁用。
  • 配置参数log.cleaner.enable现在默认为true。这意味主题在配置 cleanup.policy=compact下将缺省压缩,清洁器进程通过log.cleaner.dedupe.buffer.size缺省被分配128MB堆。您可以检查你的配置log.cleaner.dedupe.buffer.size,并根据您的压缩主题使用其他log.cleaner配置值。
  • 对于新的消费者,配置参数fetch.min.bytes的默认值现在是1。

0.9.0.0弃用的功能

  • 从kafka-topics.sh脚本(kafka.admin.TopicCommand)改变主题配置已被弃用。今后,请使用kafka-configs.sh脚本(kafka.admin.ConfigCommand)。
  • 该kafka-consumer-offset-checker.sh(kafka.tools.ConsumerOffsetChecker)已被弃用。今后,请使用kafka-consumer-groups.sh(kafka.admin.ConsumerGroupCommand)。
  • 该kafka.tools.ProducerPerformance类已弃用。今后,请使用org.apache.kafka.tools.ProducerPerformance(kafka-producer-perf-test.sh也将改为使用新的类)。
  • 生产者配置block.on.buffer.full已被弃用,并将在未来的版本中删除。目前,它的默认值已更改为false。该KafkaProducer将不再抛出BufferExhaustedException而是将使用max.block.ms值来阻止,之后它会抛出一个TimeoutException。如果block.on.buffer.full属性显式的设置为true,它将设置max.block.ms到Long.MAX_VALUE,而metadata.fetch.timeout.ms将不被认可。

0.8.1 升级到0.8.2

0.8.2与0.8.1完全兼容。可以通过简单地将其关闭,更新代码,并重新启动逐一升级代理。

0.8.0升级到 0.8.1

0.8.1与0.8完全兼容。可以通过简单地将其关闭,更新代码,并重新启动逐一升级代理。0.7升级

0.7版本与新版本不兼容。API,Zookeeper的数据结构和协议,可以配置的增加副本(这是在0.7没有的),都发生了重大变化。从0.7到更高版本的升级需要特殊的工具进行迁移。这种迁移可以无需宕机就可以完成。

 

 

注:在此文章中这两个单词被翻译为右边的对应词语。

Records – 记录(持久化的消息)

Broker –代理服务器

转载自 并发编程网 - ifeve.com

时间: 2024-09-19 09:14:39

《KAFKA官方文档》入门指南(五)的相关文章

《KAFKA官方文档》入门指南(一)

1.入门指南 1.1简介 Apache的Kafka是一个分布式流平台(a distributed streaming platform).这到底意味着什么? 我们认为,一个流处理平台应该具有三个关键能力: 它可以让你发布和订阅记录流.在这方面,它类似于一个消息队列或企业消息系统. 它可以让你持久化收到的记录流,从而具有容错能力. 它可以让你处理收到的记录流. Kafka擅长哪些方面? 它被用于两大类应用: 建立实时流数据管道从而能够可靠地在系统或应用程序之间的共享数据 构建实时流应用程序,能够变

《KAFKA官方文档》翻译邀请

之前在北京面试,很多应聘者都提到自己使用过KAFAKA,所以计划组织同学们翻译<KAFKA官方指南>,欢迎有兴趣的同学参与. 如何领取 通过评论领取想要翻译的文章,每次领取一章或一节(根据内容长短),翻译完后再领取其他章节.领取完成之后,译文最好在一个星期内翻译完成,不要超过两周,如果不能完成翻译,也欢迎你邀请其他同学和你一起完成翻译.请谨慎领取,很多文章领取了没有翻译,导致文章很长时间没人翻译. 如何提交? 翻译完成之后请登录到并发网提交成待审核状态,会有专门的编辑校对后进行发布.如果多篇文

《KAFKA官方文档》入门指南(三)

第7步:使用Kafka连接导入/导出数据 从控制台写入数据和写回控制台是一个很方便入门的例子,但你可能想用Kafka使用其他来源的数据或导出Kafka的数据到其他系统.相对于许多系统需要编写定制集成的代码,您可以使用Kafka连接到系统去导入或导出数据. Kafka Connect是包括在Kafka中一个工具,用来导入导出数据到Kafka.它是connectors的一个可扩展工具,其执行定制逻辑,用于与外部系统交互.在这个快速入门,我们将看到如何使用Kafka Connect做一些简单的连接器从

《KAFKA官方文档》入门指南(二)

把功能组合起来 消息的传输,存储和流处理的组合看似不寻常却是Kafka作为流处理平台的关键. 像HDFS分布式文件系统,允许存储静态文件进行批量处理.像这样的系统允许存储和处理过去的历史数据. 传统的企业消息系统允许处理您订阅后才抵达的消息.这样的系统只能处理将来到达的数据. Kafka结合了这些功能,这种结合对Kafka作为流应用平台以及数据流处理的管道至关重要. 通过整合存储和低延迟订阅,流处理应用可以把过去和未来的数据用相同的方式处理.这样一个单独的应用程序,不但可以处理历史的,保存的数据

《KAFKA官方文档》入门指南(四)

1.4生态系统 除了Kafka的主要版本之外,还有很多应用集成了Kafka工具.该生态系统页面中列出的许多工具,包括流处理系统,Hadoop的集成,监控和部署工具. 1.5从以前版本升级 从0.8.4,0.9.x,0.10.0.x或0.10.1.x升级到0.10.2.0 0.10.2.0的有线协议有变化.通过下面的推荐滚动升级计划,你能保证在升级过程中无需停机.但是,请在升级之前查看0.10.2.0版本显著的变化. 从0.10.2版本开始,Java客户端(生产者和消费者)已获得与旧版本代理服务器

《KAFKA官方文档》第三章:快速入门(一)

快速入门 翻译者:kimmking@163.com 原文:kafka.apache.org/quickstart 本教程假设读者完全从零开始,电脑上没有已经存在的Kafka和Zookeeper环境.以下内容需要注意的是:因为在类Unix平台和Windows平台上的Kafka控制脚本不同,在Windows平台上,需要使用路径\bin\windows代替/bin,脚本扩展名改为.bat. 第一步:下载kafka 下载Kafka 0.10.2.0版本 并解压: >tar -xzf kafka_2.11

《KAFKA官方文档》第三章:快速入门(二)

第八步:使用Kafka流(Kafka Streams)处理数据 Kafka流是一个针对存储于Kafka brokers上的数据进行实时流处理和分析的客户端类库.快速入门中的示例将展示如何使用这个类库实现一个数据流处理应用.下面是其中的WordCountDemo数单词示例代码片段(转换成Java8的lambda表达式更便于阅读). "` // 字符串和长整型的序列化器与反序列化器(serde) final Serde stringSerde = Serdes.String(); final Ser

《Kafka 官方文档》 介绍

介绍 Apache Kafka 是 一个分布式数据流平台. 这意味什么呢? 我们认为一个数据流平台有三种能力: 它让你发布和订阅数据流. 在这方面他与消息队列或企业级消息系统很像. 它让你具有很强容灾性的存储数据流. 它让你及时的处理数据流. 那么Kafka适合做什么呢? 它通常被使用在两大类应用中: 搭建可以使数据在系统或应用之间流动的实时数据流管道(pipelines) 搭建可以针对流数据实行实时转换或作出相应反应的数据流应用 为了了解Kafka具体如何实现这些功能, 我们来从底层开始,探索

《KAFKA官方文档》5.2 APIs

2. APIs Kafka包含四种核心的API: Producer API支持应用将数据流发送到Kafka集群的主题. Consumer API支持应用从Kafka集群的主题中读取数据流. Streams API支持数据流从输入主题转化到输出主题. Connect API支持实现持续地从一些源系统或应用划入Kafka或者从Kafka推入一些源系统或应用的接口. Kafka通过独立于语言的协议公开其所有功能,该协议具有可用于诸多编程语言的客户端.但是,只有Java客户端作为主Kafka项目的一部分