前言:前段时间接触过一个流式计算的任务,使用了阿里巴巴集团的JStorm,发现这个领域值得探索,就发现了这篇文章——Putting Apache Kafka To Use: A Practical Guide to Building a Stream Data Platform(Part 1)。在读的过程中半总结半翻译,形成本文,跟大家分享。
最近你可能听说很多技术名词,例如“流式处理”、“事件数据”以及“实时”等,与之相关的技术有Kafka、Storm、Samza或者是Spark的Steam Model。这些新兴的技术令人兴奋,不过还没有多少人知道如何将这些技术添加到自己的技术栈中,如何实际应用于项目中。
这篇指南讨论我们关于实时数据流的工程经验:如何在你的公司内部搭建实时数据平台、如何使用这些数据构建应用程序,所有这些都是基于实际经验——我们在Linkdin花了五年时间构建Apache Kafka,将Linkdin转换为流式数据架构,并帮助硅谷的很多技术公司完成了同样的工作。
这份指南的第一部分是关于流式数据平台(steam data platform)的概览:什么是流式数据平台,为什么要构建流式数据平台;第二部分将深入细节,给出一些操作规范和最佳实践。
何为流式数据平台?
流式数据平台:简洁、轻量的事件处理
我们在Linkein构建Apache Kafka的目的是让它作为数据流的中央仓库工作,但是为什么要做这个工作,有下面两个原因:
- 数据整合:数据如何在各个系统之间流转和传输;
- 流式处理:通常在数据仓库或者Hadoop集群中需要做丰富的数据分析,同时实现低延时。
接下来介绍下上述两个理论的提出过程。起初我们并没有意识到这些问题之间有联系,我们采取了临时方案:只要需要,就在系统和应用程序之间建造数据通道或者给web服务发送异步请求。随着时间推移,系统越来越复杂,我们在几乎所有子系统之间都建立了不同的数据通道:
data-flow-ugly.png
每个数据通道都有自己的问题:日志数据的规模很大但是数据有缺失,并且数据传输的延迟很高;Oracle数据库实例之间的数据传输速度快、准确而且实时性好,但是其他系统不能及时快速得获得这些数据;Oracle数据库的数据到Hadoop集群的数据通道吞吐量很高,但是只能进行批次操作;搜索系统数据通道的延迟低,不过数据规模小,并且是直接连接数据库;消息系统数据通道的延迟低,但是不可靠且规模小。
随着我们在全球各地添加数据中心,我们也要为这些数据流添加对应的副本;随着系统规模的增长,对应的数据通道规模也应该相应得增长,整个系统面临的压力越来越大。我认为我的团队与其说是由分布式系统工程师组成,还不如说是由一些管道工组成。
更糟的是,复杂性过高导致数据不可靠。由于数据的索引和存储存在问题,导致我们的报告可信度降低。员工需要花费大量时间处理各种类型的脏数据,记得有在处理一起故障中,我们在两个系统中发现一些非常类似但存在微小差异的数据,我们费了很大力气检查这两个数据哪个是争取额的,最后发现两个都不对。
与此同时,我们除了要做数据迁移,还想对数据进行进一步的处理和分析。Hadoop平台提供了批处理、数据打包和专案(ad hoc)处理能力,但是我们还需要一个实时性更好的数据处理平台。我们的很多系统——特别是监控系统、搜索索引的数据通道、数据分析应用以及安全分析应用,都需要秒级的响应速度,但是这类型的应用在上图的系统架构中表现很差。
2010年左右,我们开始构建一个系统:专注于实时获取流式数据(stream data),并规定各个系统之间的数据交互机制也以流式数据为承载,同时还允许对这些流式数据进行实时处理。这就是Apache Kafka的原型。
我们对整个系统的构想如下所示:
stream_data_platform.png
很长一段时间内我们都没有为我们所构建的这个系统取名字,仅仅称之为“Kafka stuff”或者“global commit log thingy”,随着时间推移,我们开始将这个系统中的数据称之为流式数据(steam data),而负责处理这种类型的数据的平台称之为流式数据平台(steam data platform)。
最终我们的系统从前文描述的跟“意大利面条”一样杂乱进化为清晰的以流式数据平台为中心的系统:
A modern strea-centric data architecture built around Apache Kafka
在这个系统中Kafka的角色是通用数据管道。每个子系统都可以很容易得接入到这个中央数据管道上;流式处理应用可以接入到该数据管道上,并对外提供经过处理后的流式数据。这种固定格式的数据类型成为各个子系统、应用和数据中心之间的通用语言。举个例子说明:如果一个用户更新了他的个人信息,这个更新信息会流入我们的系统处理层,在系统处理层会对该用户的公司信息、地理位置和其他属性进行标准化处理;然后这个数据流会流入搜索引擎和社区地图用于查询和检索、这个数据也会流入推荐系统进行工作匹配;所有的这些动作只需要毫秒量级的时间,最后这些数据会流入Hadoop数据仓库。
LinkedIn内部在大量使用这套系统,每天为数百个数据中心处理超过5000亿事件请求,该系统已经成为其他系统的数据后台、成为Hadoop集群的数据管道,以及流式处理的Hub。
由于Kafka开源,因此有很多公司在做类似的事情:Kafka Powered By
接下来我们将论述流式数据平台的一些细节:该平台的工作原理、该平台解决了什么重要问题。
流式数据(Steam Data)
大部分业务逻辑可以理解为事件流(steam of events)。零售业有订单流、交易流、物流信息流、价格调整事件流,以及各类调用的返回值等等;金融行业有订单流、股票价格变更事件流,以及其他金融行业的信息流;网站有点击流、关注流(impressions)、搜索流等等。在大规模的软件系统中还有请求流、错误流、机器监控信息流和日志流。总之,业务逻辑可以从整体上当作一种数据处理系统——接收多种输入流并产生对应的输出流(有时还会产生具体的物理产品)。
这种概念对于习惯于将数据想象为数据库中的一行的同学可能有点陌生,接下来我们看一点关于事件流数据的实际例子。
事件触发和事件流
数据库中存放的是数据的当前状态,当前状态是过去的某些动作(action)的结果,这些动作就是事件。库存表保存购买和交易事件产生的结果,银行结余存放信贷和借记事件的结果;Web Server的延时图是一系列HTTP请求的聚合。
当谈论大数据时,很多人更青睐于记录上述提到的这些事件流,并在此基础上进行分析、优化和决策。某种层度上来说,这些事件流是传统的数据库没有反应出来的一面:它们表示业务逻辑。
事件流数据在金融行业已经广泛使用:股票发行、市场预测、股票交易等数据都可以当作是事件流,但是技术届使得搜集和使用这些数据的现代技术开始流行。Google将广告点击流和广告效果转化为几十亿美金的收入。在web开发届,这些事件数据又被称为日志数据,由于缺乏针对日志处理的模块,这些日志事件就存放在日志文件中。Hadoop之类的系统经常用于日志处理,但是根据实际情况,称之为“批量事件存储和处理(batch event storage and processing)”更合适。
网络公司应该是最早开始记录事件流的公司,搜集网站上的事件数据非常容易:在某些特定节点加一些代码即可记录和跟踪每个用户在改网站上的行为。即使是一个单页面或者是某个流行网站上的移动窗口也能记录很多类似的行为数据用于分析和监控。
你可能听说过“机器产生的数据”这个概念,其实跟事件数据表示相同的含义。某种程度上所有的数据都是机器产生的,因为这些数据来自计算机系统。
还有很多人在谈论设备数据和“物联网(internet of things)”。不同的人对这些名词有各自的理解,但是这个物联网的核心也在于针对某些数据集进行分析和决策,只不过我们这里的分析对象是大规模网络系统,而物联网的分析对象是工业设备或者消费产品。
数据库是事件流
事件流数据很适合描述日志数据或诸如订单、交易、点击和贸易这些具备明显事件特征的数据。和大多数开发人员相同,你可能将自己系统的大部分数据保存在各种数据库中:关系型数据库(Oracle、MySQL和Postgres)或者新兴的分布式数据库(MongoDB、Cassandra和Couchbase),这些数据可能不容易理解为事件或者事件流。
但实际上,数据库中存储的数据也可理解为一种事件流(event steam),简单来说,数据库可以理解为创建数据备份或者建立备库的过程。做数据备份的主要方法是周期性得导出数据库内容,然后将这些数据导入到备库中。如果我很少进行数据备份,或者是我的数据量不大,那么可以进行全量备份。实际上,随着备份频率的提高,全量备份不再可行:如果两天做一次全量备份,将会耗费两倍的系统资源、如果每个小时做一次全量备份,则会耗费24倍的系统资源。在大规模数据的备份中,显然增量备份更加有效:只增加新创建的、更新的数据和删除对应的数据。利用增量备份,如过我们将备份频率提高为原来的1倍,则每次备份的数量将减少几乎一半,消耗的系统资源也差不多。
那么为什么我们不尽可能提高增量备份的频率呢?我们可以做到,但是最后只会得到一系列单行数据改变的记录——这种事件流称之为变更记录,很多数据库系统都有负责这个工作的模块(Oracle数据库系统中的XStreams和GoldenGate、MySQL有binlog replication、Postgres有Logical Log Steaming Replication)。
综上,数据库的变更过程也可以作为事件流的一部分。你可以通过这些事件流同步Hadoop集群、同步备库或者搜索索引;你还可以将这些事件流接入到特定的应用或者流式处理应用中,从而发掘或者分析出新的结论。
流式数据平台解决的问题?
流式数据平台有两个主要应用:
- 数据整合:流式数据平台搜集事件流或者数据变更信息,并将这些变更输送到其他数据系统,例如关系型数据库、key-value存储系统、Hadoop或者其他数据仓库。
- 流式处理:对流式数据进行持续、实时的处理和转化,并将结果在整个系统内开放。
在角色1中,流式数据平台就像数据流的中央集线器。与之交互的应用程序不需要考虑数据源的细节,所有的数据流都以同一种数据格式表示;流式数据平台还可以作为其他子系统之间的缓冲区(buffer)——数据的提供者不需要关心最终消费和处理这些数据的其他系统。这意味着数据的消费者与数据源可以完全解耦合。
如果你需要部署一个新的系统,你只需要将新系统接入到流式数据平台,而不需要为每个特定的需求选择(并管理)各自的数据库和应用程序。不论数据最初来自日志文件、数据库、Hadoop集群或者流式处理系统,这些数据流都使用相同的格式。在流式数据平台上部署新系统非常容易,新系统只需要跟流式数据平台交互,而不需要跟各种具体的数据源交互。
Hadoop集群的设计目标是管理公司的全量数据,直接从HDFS中获取数据是非常耗费时间的方案,而且直接获取的数据不能直接用于实时处理和同步。但是,这个问题可以反过来看:Hadoop等数据仓库可以主动将结果以流式数据的格式推送给其他子系统中。
流式数据平台的角色2包含数据聚合用例,系统搜集各类数据形成数据流,然后存入Hadoop集群归档,这个过程就是一个持续的流式数据处理。流式处理的输出还是数据流,同样可以加载到其他数据系统中。
流式处理可以使用通过简单的应用代码实现,这些处理代码处理事件流并产生新的事件流,这类工作可以通过一些流行的流式处理框架完成——Storm、Samza或Spark Streaming,这些框架提供了丰富的API接口。这些框架发展得都不错,同时它们跟Apache Kafka的交互都很好。
流式数据平台需要提供的能力?
在上文中我提到了一些不同的用例,每个用例都有对应的事件流,但是每个事件流的需求又有所不同——有些事件流要求快速响应、有些事件流要求高吞吐量、有些事件流要求可扩展性等等。如果我们想让一个平台满足这些不同的需求,这个平台应该提供什么能力?
我认为对于一个流式数据平台,应该满足下列关键需求:
- 它必须足够可靠,以便于处理严苛的更新,例如将某个数据库的更新日志变更为搜索索引的存储,能够顺序传输数据并保证不丢失数据;
- 它必须具备足够大的吞吐量,用于处理大规模日志或者事件数据;
- 它必须具备缓冲或者持久化数据的能力,用于与Hadoop这类批处理系统交互。
- 它必须能够为实时处理程序实时提供数据,即延时要足够低;
- 它必须具备良好的扩展性,可以应付整个公司的满负载运行,并能够集成成百上千个不同团队的应用程序,这些应用以插件的形式与流式数据平台整合。
- 它必须能和实时处理框架良好得交互
流式数据平台是整个公司的核心系统,用于管理各种类型的数据流,如果该系统不能提供良好的可靠性以及可扩展性,系统会随着数据量的增长而再次遭遇瓶颈;如果该系统不支持批处理和实时处理,那么就不能与Hadoop或者Storm这类系统整合。
Apache Kafka
Apache Kafka是专门处理流式数据的分布式系统,它具备良好的容错性、高吞吐量、支持横向扩展,并允许地理位置分布的流式数据处理。
Kafka常常被归类于消息处理系统,它确实扮演了类似的角色,但同时也提供了其他的抽象接口。在Kafka中最关键的抽象数据结构是用于记录更新的commit log:
commit_log-copy.png
数据生产者向commit log队列中发送记录流,其他消费者可以像水流一样在毫秒级延时处理这些日志的最新信息。每个数据消费者在commit log中有一个自己的位置(指针),并独立移动,这使得可靠、顺序更新能够分布式得发送给每个消费者。
这个commit log的作用非常关键:可以多个生产者和消费者共享,并覆盖一个集群中的多台机器,每台机器都可用作容错保障;可以提供一个并行模型,其具备的顺序消费的特点使得Kafka可以用于记录数据库的变更。
Kafka是一个现代的分布式系统,存储在一个集群的数据(副本和分片存储)可以水平扩张和缩小,同时上层应用对此毫无感知。数据消费者的机器数量可以随数据规模的增长而水平增加,同时可以自动应对数据处理过程中发生的错误。
Kafka的一个关键设计是对持久化的处理相当好,Kafka的消息代理(broker)可以存储TB量级的数据,这使得Kafka能够完成一些传统数据库无法胜任的任务:
- 接入Kafka的Hadoop集群或者其他离线系统可以放心得停机维护,间隔几小时或者几天后再平滑接入,因为在它停机期间到达的流式数据被存储在Kafka的上行集群。
- 在首次执行同步数据库的任务时可以执行全量备份,以便让下行消费者访问全量数据。
上述这些特性使得Kafka能够提供比传统的消息系统更广的应用范围。
事件驱动的应用
自从我们将Kafka开源后,我们有很多机会与其他想做类似的事情的公司交流和合作:研究如何Kafka系统的部署以及Kafka在该公司内部技术架构的角色如何随着时间演进和改变。
初次部署常常用于单个的大规模应用:日志数据处理,并接入Hadoop集群;也可能是其他数据流,该数据流的规模太大以至于超出了该公司原有的消息系统的处理能力。
从这些用例延伸开来,在接入Hadoop集群后,很快就需要提供实时数据处理的能力,现存的应用需要扩展和重构,利用现有的实时处理框架更高效得处理流式数据。以LinkedIn为例,我们最开始是利用Kafka处理job信息流,并将job信息存入Hadoop集群,然后很多ETL-centric的应用需求开始出现,这些job信息流开始用于其他子系统,如下图所示:
job-view.png
在这张图中,job的定义不需要一些定制就可以与其他子系统交互,当上游应用(移动应用)上出现新的工作信息时,就会通过Kafka发送一个全局事件,下游的数据处理应用只需要响应这个事件即可。
流式数据平台与现存中间件的关系
我们简单讲下流式数据平台与现存的类似系统的关系。
消息系统(Messaging)
流式数据平台类似于企业消息系统——它接收消息事件,并把它们发布到对应事件的订阅者。不过,二者有三个重要的不同:
- 消息系统通常是作为某个应用中的一个组件来部署,不同的应用中有不同的消息系统,而流式数据平台希望成为整个企业的数据流Hub。
- 消息系统与批处理系统(数据仓库或者Hadoop集群)的交互性很差,因为消息系统的数据存储容量有限;
- 消息系统并未提供与实时处理框架整合的API接口。
换句话说,流式数据平台可以看作在公司级别(消息系统的级别是项目)设计的消息系统。
数据聚合工具(Data Integration Tools)
为了便于跟其他系统整合,流式数据平台做了很多工作。它的角色跟Informatica这类工具不同,流式数据平台是可以让任何系统接入,并可以围绕该平台构建不同的应用。
流式数据平台与数据聚合工具有一点重合的实践:使用一个统一的数据流抽象,保证数据格式相同,这样可以避免很多数据清洗任务。我会在这个系列文章的第二篇仔细论述这个主题。
企业服务总线(Enterprise Service Buses)
我认为流式数据平台借鉴了很多企业服务总线的设计思想,不过提供了更好的实现方案。企业服务总线面临的挑战就是自身的数据传输效率很低;企业服务总线在部署时也面临一些挑战:不适合多租户使用(PS,此处需要看下原文,欢迎指导)。
流式数据平台的优势在于数据的传输与系统本身解耦合,数据的传输由各个应用自身完成,这样就能避免平台自身成为瓶颈。
变更记录系统(Change Capture Systems)
常规的数据库系统都有类似的日志机制,例如Golden Gate,然而这个日志记录机制仅限于数据库使用,并不能作为通用的事件记录平台。这些数据库自带的日志记录机制主要用于同类型数据库(eg:Oracle-to-Oracle)之前的互相备份。
数据仓库和Hadoop
流式数据平台并不能替代数据仓库,恰恰相反,它为数据仓库提供数据源。它的身份是一个数据管道,将数据传输到数据仓库,用于长期转化、数据分析和批处理。这个数据管道也为数据仓库提供对外输出结果数据的功能。
流式处理系统(Steam Processing Systems)
常用的流式处理框架,例如Storm、Samza或Spark Streaming可以很容易得跟流式数据平台整合。这些流式数据处理框架提供了丰富的API接口,可以简化数据转化和处理。
流式数据平台的落地与实践
我们不只是提出了一个很好的想法,我们面临的需求很适合将自己的想法落地。过去五年我们都在构建Kafka系统,帮助其他公司落地流式数据平台。今天,在硅谷有很多公司在实践这套设计思路,每个用户的行为都被实时记录并处理。
前瞻
我们一直在思考如何使用公司掌握的数据,因此构建了Confluent平台,该平台上有一些工具用来帮助其他公司部署和使用Apache Kafka。如果你希望在自己的公司部署流式数据处理平台,那么Confluent平台对你绝对有用。
还有一些用的资源:
- 我之前写过的blog post和小书,讨论的主题包括Kafka中的日志抽象、数据流和数据系统架构等;
- Kafka的官方文档也很有用;
- 在这里有关于Confluent平台的更多介绍
这个教程的下篇将会论述在构建和管理数据流平台中的一些实践经验。