《Kafka官方文档》设计(二)

6. Message Delivery Semantics

现在我们对Producer和Consumer已经有了一定的了解,接着我们来讨论Kafka在Producer和Consumer上提供的语义。显然的,在分发消息时是可以有多种语义的:

  • At most once:消息可能丢失,但不会重复投递
  • At least once:消息不会丢失,但可能会重复投递
  • Exactly once:消息不丢失、不重复,会且只会被分发一次(真正想要的)

值得注意的是这分为两个问题:发布消息的可用性和消费消息的可用性。

许多系统都声称提供“exactly once”语义,仔细阅读会发现,这些声明是误导的(他们没有考虑Producer和Consumer可能Crash的场景,或是数据写入磁盘后丢失的情况)。

Kafka提供的语义是直接了当的。发送消息的时候我们有一个消息被Commit到Log的概念。一旦消息已经被Commit,它将不会丢失,只要还有一个复制了消息所在Partition的Broker存活着。“存活”的定义以及我们覆盖的失败的情况将在下一节描述。现在假设一个完美的Broker,并且不会丢失,来理解对Producer和Consumer提供的语义保证。如果Producer发送一条消息,并且发生了网络错误,我们是不能确认错误发生在消息Commit之前还是消息Commit之后的。类似于使用自增主键插入数据库,是不能确认写入之后的主键值的。

Producer没有使用的强制可能的语义。我们无法确认网络是否会发生异常,可以使Producer创建有序的主键使重试发送成为幂等的行为。这个特性对一个复制系统来说不是无价值的,因为服务器在发生故障的情况下依旧需要提供服务。使用这个功能,Producer可以重试,直到收到消息成功commit的响应,在这个点上保证消息发送的exactly once。我们希望把这个特性加到后续的Kafka版本中。

不是所有的场景都需要这样的保证。对应延迟敏感的场景,我们允许Producer指定其期望的可用性级别。如果Producer期望等待消息Commit,那么这可能消耗10ms。Producer也可以指定以异步的方式发送消息或只等Leader节点写入消息(不能Follower)。

接着我们从消费者的视角来描述语义。所有的副本都拥有偏移量相同的日志。Consumer控制它在日志中的偏移量。如果Consumer一直正常运行,它可以只把偏移量存储在内存中,但是如果Consumer crash且我们期望另一个新的Consumer接管消费,那么需要选择一个位置来开始消费。假设Consumer读取了一些消息——它有集中处理消息和位置的方式。

它可以读取消息,然后保存位置信息,然后处理消息。在这个场景中,Consumer可能在保存位置信息后消费消息失败,那么下一次消费可能从保存的位点开始,尽管之前部分消息被处理失败。这是消费处理过程中失败的at-most-once(只被处理了一次,但是可能处理失败)。

它可以读取消息,之后处理消息,最后保存位置信息。这个场景中,Consumer可能在处理完消息,但是保存位点之前Crash,那么下一次会重新消费这些消息,尽管已经被消费过。这是Consumer Crash引起的at-least-once(消息可能会被处理多次)。

在很多场景冲,消息可以有一个逐渐,这样可以保证处理的幂等性(多次处理不会有影响)。

那么什么是exactly once语义?这里的限制实际上不是消息系统的特性,而是消息处理和位置信息的保存。经典的解决方案是采用两阶段提交的方式来处理。但是这也可以用一个更简单的方式来处理:通过将消息处理结果和位置信息保存在同一位置上。这是更好的,因为很多Consumer期望写入的系统并不支持两阶段提交。例如, 我们的hadoop ETL工具从保存数据到dhfs上的同时也把位移位置也保存到hdfs中了, 这样可以保证数据和位移位置同时被更新或者都没更新.我们在很多系统上使用类似的模式, 用于解决那些需要这种强语义但是却没有主键用于区分重复的储存系统中.

默认Kafka提供at-least-once语义的消息分发,允许用户通过在处理消息之前保存位置信息的方式来提供at-most-once语义。exactly-once语义需要和输出系统像结合,Kafka提供的offset可以使这个实现变的“直接了当的”(变得比较简单)。

7. Replication

Kafka为Topic的每个Partition日志进行备份,备份数量可以由用户进行配置。这保证了系统的自动容错,如果有服务器宕机,消息可以从剩余的服务器中读取。

其他消息系统提供了备份相关的功能,但在我们看来,这是一个附加的功能,不能被大量使用,并且伴随着大量的缺点:Slave是不活跃的(这里应该是指Slave只提供了备份,并不可以被消费等等)、吞吐受到很大的影响、需要手动配置等等。在Kafka中,我们默认就提供备份,实际上我们认为没有备份的Topic是一种特殊的备份,只是备份数为1。

备份的单位是Topic的分区。在没有发生异常的情况下,Kafka中每个分区都会有一个Leader和0或多个Follower。备份包含Leader在内(也就是说如果备份数为3,那么有一个Leader Partition和两个Follower Partition)。所有的读写请求都落在Leader Partition上。通常情况下分区要比Broker多,Leader分区分布在Broker上。Follower上的日志和Leader上的日志相同,拥有相同的偏移量和消息顺序(当然,在特定时间内,Leader上日志会有一部分数据还没复制到Follower上)。

Follower作为普通的Consumer消费Leader上的日志,并应用到自己的日志中。Leader允许Follower自然的,成批的从服务端获取日志并应用到自己的日志中。

大部分分布式系统都需要自动处理故障,需要对节点“alive”进行精确的定义。例如在Kafka中,节点存活需要满足两个条件:

  1. 节点需要保持它和ZooKeeper之间的Session(通过ZK的心跳机制)
  2. 如果是Follower,需要复制Leader上的写事件,并且复制进度没有“落后太多”

我们称满足这两个条件的节点为“同步的”来避免使用“alive”或“failed”这样模糊的概念。Leader节点保存同步中的Follower节点。如果一个Follower宕机或复制落后太多,Leader将从同步的Follower List中将其移除。通过replica.lag.time.max.ms配置来定义“落后太多”。

在分布式系统的术语中,我们只尝试处理“失败/恢复”模型——节点突然停止工作之后恢复的情况。Kafka不处理“拜占庭”问题。

一条消息在被应用到所有的备份上之后被认为是“已经提交的”。只有提交了的消息会被Consumer消费。这意味着Consumer不需要担心Leader节点宕机后消息会丢失。另一方面,Producer可以配置是否等待消息被提交,这取决于他们在延迟和可用性上的权衡。这个可以通过Producer的配置项中设置。

Kafka提供了一条消息被提交之后,只要还有一个备份可用,消息就不会丢失的保证。

Kafka保证在节点故障后依旧可用,但是无法再网络分区的情况下保持可用。

Replicated Logs: Quorums, ISRs, and State Machines (Oh my!)

Kafka分区机制的核心是日志复制。日志复制是分布式系统中最基础的东西,有很多方式可以实现。日志复制可以作为基于状态机的分布式系统的基础设置。

日志复制模型用于处理连续、有序的输入(例如给log entry添加0、1、2这样的编号)。有很多方式实现日志复制,最简单的方式是Leader选择和提供这个顺序之。只要Leader节点存活,Follower只需要按照Leader选择的值和顺序来复制即可。

当然,如果Leader不会宕机,那我们也不需要Follower了!在Leader宕机之后,我们需要在Follower中选择一个节点成为新的Leader。Follower可能会宕机或者日志落后较多,所以我们必须确保选择一个“及时同步”(复制进度和Leader最近的节点)成为新的Leader。复制算法必须提供这样的保证:如果Client收到一条消息已经被Commit了,如果Leader宕机,新Leader必须包含这条已经被Commit的消息。这是一个权衡:Leader在确认消息Commit之前需要等待更多的Follower来确认复制了消息来保证在Leader宕机后有更多可以成为Leader的Follower节点。

如果你选择了所需要的ACK的数量以及选择Leader时需要比较的日志数以确保能重合,这个叫做Quorum。

一个通用的来权衡的方式是提交日志和选择Leader时都采用大多数投票的原则。这不是Kafka使用的方式,但是无所谓,让我们去理解这种方式来了解实现原理。假设一共有2f+1个备份,那么f+1的副本必须在Leader提交commit之前接收到消息,这样就可以从f+1个节点中选择出新节点作为Leader。因为任何f+1个节点,必然有一个节点包含最全的日志。还有很多关于这个算法的细节需要处理(如何定义日志更全、在Leader节点宕机时保持日志一致性等)在这里先忽略。

大多数选票的方法有非常好的特性:延迟取决于同步最快的Server节点。这说明,如果备份数为3,那么延迟取决于两个备份节点中较快的节点。

有很多类似的算法变体,例如ZooKeeper的Zab,Raft,Viewstamped Replication等。和Kafka最相似的学术刊物是微软的PacificA。

大多数选票方式的取消是它不能容忍很多的故障,导致你没有可以被选为新Leader的节点。为了容忍一个节点故障,需要3分数据备份,容忍两个节点故障则需要5个节点。在我们的经验中,只有足够的冗余才能容忍单一的故障在实际系统中是不够的,每次写5次副本,使用5倍的存储空间,和1/5的带宽,在大体量的数据存储上不是很可行。这就是为什么quorum算法多多应用在像ZK这样存储配置的集群中,而不是数据存储系统中。例如HDFS的namenode的高可用建立在大多数选票的机制上,但是数据存储缺不是。

Kafka使用一个明显不同的方式来选择quorum集合。代替大多数选票,Kafka动态的维护一个“同步的备份(in-sync replicas ISR)”的集合。只有这个集合中的成员能被选举为Leader。一个写入请求需要同步到所有的同步中的备份才能认为是提交的。ISR集合在变更时会被持久化到ZK。因此,任何ISR中的备份都可以被选举为新的Leader。这对于Kafka这种拥有多分区并且需要保证这节点负载均衡的模型来说非常重要。使用ISR模型和f+1个副本,Kafka可以容忍f个备份不可用的情况。

对于大多数的场景,我们认为这样的妥协是合理的。在实践中,为了容忍f个节点故障,大多数选票原则和ISR方式都需要等待相同的备份在提交消息前进行确认(如需要容忍一个节点故障,大多数选票的选择需要3个节点,并且提交消息需要至少一个备份的确认;ISR只需要两个节点,需要确认的副本数一样是一个)。相对于大多数选票的原则,ISR方式不需要等待最慢的服务器确认消息是一个优势。尽管如此,我们进行改善,让客户端决定是否等待消息提交,使用较小的副本数,这样带来的吞吐和更小的磁盘空间要求是有价值的。

另一个重要的设计是Kafka不需要故障的节点恢复所有的数据。这是不常见的,复制算法依赖于存储介质在任何故障的情况下都不丢失数据并且不违反一致性原则。这个假设有两个主要的问题。第一,磁盘故障是持久化数据系统中最常见的问题,并且它通常导致数据不完整。第二,即使这不是一个问题,我们也不希望在每一次写入之后都使用fsync来保证一致性,这会使性能下降两三个数量级。我们的协议中允许一个副本重新加入到ISR集合中,在重新加入之前,它需要从新同步在故障时丢失的数据。

Unclean leader election: What if they all die?

Kafka保证的数据不丢失,在至少有一个备份保持同步的情况下。如果一个分区所有的备份的节点都故障,那么就不能提供这个保障了。

但是实践系统中需要一些合理的事情,在所有备份故障时。如果不巧遇上这个问题,去考虑哪些情况会发生是非常重要的。有两种方式去做:

  1. 等待一个ISR中的副本恢复并将其选举为新的Leader(期望它拥有所有的数据)。
  2. 选择第一个副本(无需在ISR中)作为Leader。

这是在可用性和一致性之间的权衡。如果我们等待ISR中的备份恢复,那么会在这个期间一直不可用。如果这样的副本被损坏,那么我们将永久性的失效。另一方便,如果使用不在ISR中的备份成为Leader,尽管它可能不包含所有的日志。默认情况下,Kafka使用第二种策略,当所有ISR中的备份不可用时,倾向于选择可能不一致的备份。这个方式可以通过unclean.leader.election.enable配置禁用,在哪些停机时间优于不一致的场景。

这种困境不是kafka特有的, 这存在于任何基于quorum方式的结构中. 例如, 多数投票算法, 如果大多数的服务器都永久性失效了, 你必须选择丢失全部的数据或者接受某一台可能数据不一致的服务器上的数据.

Availability and Durability Guarantees

在向Kafka写入数据时,Producer可以选择是否等待0,1或(-1)个备份响应。注意,这里说的“被所有备份响应”不是说被所有分配的备份响应,默认情况下只的时所有ISR集合中的备份响应。例如,如果一个Topic配置成只需要两个备份,并且一个备份故障了,那么写入一个备份即认为收到了所有响应。但是,如果这个备份也故障了,那么数据会丢失。这样保证了分区的最大可用,但是可能不是那些相对于可用性更需要可靠性的用户的需求。因此,我们提供两种Topic级别的配置,相对于可用性,优先保证可靠性:

  1. 禁用unclean leader election;如果所有备份不可用,那么分区保持不可用,直到最近的Leader重新恢复可用。这可能导致不可用,但是不会丢失数据。
  2. 配置一个最小的ISR大小;分区只会在满足最小ISR的情况下接受请求,这样可以避免数据只写入一个备份,而这个备份后续故障导致数据丢失。这个配置只在Producer使用acks=all的配置时有效。这个配置在一致性和可用性上做了权衡。更大的ISR提供了更好的一致性,但是降低了可用性,如果同步备份数小于最小ISR配置时将不可用。

Replica Management

以上的讨论都是基于一个日志,即一个Topic的分区考虑的。但是Kafka集群拥有成百上千这样的分区。我们尝试使用轮训的方式来平衡分区,避免高数量的Topic的分区集中在一部分少量的节点上。同样我们要平衡所有Leader分区,这样每个节点上承载的主分区都有一定的比例。

优化Leader的选举过程也是非常重要的,因为这是系统不可用的窗口期。一个直观的实现是,如果一个节点故障了,为这个节点上所有的分区都独立的执行一次选举。代替这种方式,我们选择一个Broker作为Controller,Controller负责一个故障节点影响的所有分区的Leader变更。这样的好处是我们可以批量处理,减少独立选举时大量的通知,这使得大量分区需要选举时变得更快,代价更小。如果Controller故障了,剩余的Broker中会有一个节点成为新的Controller。

转载自 并发编程网 - ifeve.com

时间: 2024-08-29 03:38:21

《Kafka官方文档》设计(二)的相关文章

《KAFKA官方文档》设计与实现(二)

5.4 消息格式 /** * 1. 消息的4字节CRC32 * 2. 一个字节的 identifier ,用以格式的变化,变化的值为0 或者1 * 3. 一个字节的 identifier属性,允许消息的注释与版本无关 * 位 0 ~ 2 : 压缩编解码 * 0 : 无压缩 * 1 : gzip * 2 : snappy * 3 : lz4 * bit 3 : 时间戳类型 * 0 : 创建时间 * 1 : 日志追加时间 * bit 4 ~ 7 : 保留位 * 4. (可选的) 8字节时间戳只有当"

《Kafka官方文档》设计(一)

Design 1. Motivation 我们设计Kafka用来作为统一的平台来处理大公司可能拥有的所有实时数据源.为了做到这点,我们必须思考大量的使用场景. 它必须有高吞吐去支持大数据流,例如实时日志聚合. 它必须优雅的处理数据积压,以支持定期从离线系统加载数据. 这也以为这系统必须支持低延迟的分发来处理传统消息系统的场景. 我们想支持分区的.分布式的.实时的处理数据源并创建新的数据源,这推动了我们的分区和消费模型. 最后,将流反馈到其他系统进行服务的情况下,我们知道系统必须能够保证容错性,在

《KAFKA官方文档》设计与实现(一)

5.设计与实现(IMPLEMENTATION) 5.1 API 设计 生产者 APIS 生产者API包含2个producers-kafka.producer.SyncProducer和kafka.producer.async.AsyncProducer.示例代码如下: class Producer { /* Sends the data, partitioned by key to the topic using either the */ /* synchronous or the async

《KAFKA官方文档》第三章:快速入门(二)

第八步:使用Kafka流(Kafka Streams)处理数据 Kafka流是一个针对存储于Kafka brokers上的数据进行实时流处理和分析的客户端类库.快速入门中的示例将展示如何使用这个类库实现一个数据流处理应用.下面是其中的WordCountDemo数单词示例代码片段(转换成Java8的lambda表达式更便于阅读). "` // 字符串和长整型的序列化器与反序列化器(serde) final Serde stringSerde = Serdes.String(); final Ser

《KAFKA官方文档》入门指南(二)

把功能组合起来 消息的传输,存储和流处理的组合看似不寻常却是Kafka作为流处理平台的关键. 像HDFS分布式文件系统,允许存储静态文件进行批量处理.像这样的系统允许存储和处理过去的历史数据. 传统的企业消息系统允许处理您订阅后才抵达的消息.这样的系统只能处理将来到达的数据. Kafka结合了这些功能,这种结合对Kafka作为流应用平台以及数据流处理的管道至关重要. 通过整合存储和低延迟订阅,流处理应用可以把过去和未来的数据用相同的方式处理.这样一个单独的应用程序,不但可以处理历史的,保存的数据

《KAFKA官方文档》入门指南(一)

1.入门指南 1.1简介 Apache的Kafka是一个分布式流平台(a distributed streaming platform).这到底意味着什么? 我们认为,一个流处理平台应该具有三个关键能力: 它可以让你发布和订阅记录流.在这方面,它类似于一个消息队列或企业消息系统. 它可以让你持久化收到的记录流,从而具有容错能力. 它可以让你处理收到的记录流. Kafka擅长哪些方面? 它被用于两大类应用: 建立实时流数据管道从而能够可靠地在系统或应用程序之间的共享数据 构建实时流应用程序,能够变

《KAFKA官方文档》翻译邀请

之前在北京面试,很多应聘者都提到自己使用过KAFAKA,所以计划组织同学们翻译<KAFKA官方指南>,欢迎有兴趣的同学参与. 如何领取 通过评论领取想要翻译的文章,每次领取一章或一节(根据内容长短),翻译完后再领取其他章节.领取完成之后,译文最好在一个星期内翻译完成,不要超过两周,如果不能完成翻译,也欢迎你邀请其他同学和你一起完成翻译.请谨慎领取,很多文章领取了没有翻译,导致文章很长时间没人翻译. 如何提交? 翻译完成之后请登录到并发网提交成待审核状态,会有专门的编辑校对后进行发布.如果多篇文

《Kafka 官方文档》 介绍

介绍 Apache Kafka 是 一个分布式数据流平台. 这意味什么呢? 我们认为一个数据流平台有三种能力: 它让你发布和订阅数据流. 在这方面他与消息队列或企业级消息系统很像. 它让你具有很强容灾性的存储数据流. 它让你及时的处理数据流. 那么Kafka适合做什么呢? 它通常被使用在两大类应用中: 搭建可以使数据在系统或应用之间流动的实时数据流管道(pipelines) 搭建可以针对流数据实行实时转换或作出相应反应的数据流应用 为了了解Kafka具体如何实现这些功能, 我们来从底层开始,探索

《KAFKA官方文档》入门指南(三)

第7步:使用Kafka连接导入/导出数据 从控制台写入数据和写回控制台是一个很方便入门的例子,但你可能想用Kafka使用其他来源的数据或导出Kafka的数据到其他系统.相对于许多系统需要编写定制集成的代码,您可以使用Kafka连接到系统去导入或导出数据. Kafka Connect是包括在Kafka中一个工具,用来导入导出数据到Kafka.它是connectors的一个可扩展工具,其执行定制逻辑,用于与外部系统交互.在这个快速入门,我们将看到如何使用Kafka Connect做一些简单的连接器从