使用Elasticsearch,Kafka和Cassandra构建流式数据中心

在过去的一年里,我遇到了一些软件公司讨论如何处理应用程序的数据(通常以日志和metrics的形式)。在这些讨论中,我经常会听到挫折感,他们不得不用一组零碎的工具,随着时间的推移将这些数据汇总起来。这些工具,如:
- 运维人员使用的,用于监控和告警的工具
- 开发人员用于跟踪性能和定位问题的工具
- 一个完整独立的系统,商业智能(BI)和业务依赖其分析用户行为

虽然这些工具使用不同的视角,适用不同的场景,但是他们同样都是关注数据来源和类型。因此,许多软件团队说,“如果时间充裕,我们可以建立一个更好的”,坦率地说,现在有很多出色的开源代码,自己重头建立一套是否更有意义值得商榷。在Jut我们就是这样做的。我们使用开源的大数据组件建立了一个流式数据分析系统,这篇文章描述了我们使用的片段以及我们如何把它们组合在一起。我们将介绍:
- 数据摄取:如何引入不同类型的数据流
- 索引及保存数据:高效存储以及统一查询
- 串联:系统中的数据流过程
- 调优:让整个过程真正的快速,用户才会真的使用它

我希望通过阅读这篇文章将有助于您的系统在一个理智的,可扩展的方式避免一些我们遇到的陷阱。

数据摄取

当涉及到业务分析和监控,大部分相关的数据类型,格式和传输协议并不是固定的。你需要能够支持系统不同的数据来源和数据发送者。例如,您的数据可能包括下列任何一种:
- 自定义的应用程序事件。
- 容器级指标和日志。
- statsd或收集的度量指标。
- 来自第三方的webhook事件,像GitHub或Stripe。
- 应用程序或服务器日志。
- 用户行为。

虽然这些都有不同的格式和象征,他们在系统内部需要一个统一的格式。无论你选择哪一个格式,你都需要对输入的数据流做转换。

我们选择了简单灵活的数据格式:每个记录(“点”)是一系列的键/值对,它可以方便地表示为一个JSON对象。所有的点都有一个“时间”字段,度量点也有一个数值型的“值”字段;其他点可以有任何的“形状”。前端HTTPS服务器(运行Nginx)接收数据,多路分配并发送到本地的每个数据类型“连接器”进程(运行Node.js)。这些进程将传入的数据转换为系统的内部格式,然后将它们发布到一个Kafka topic(可靠性),从中,它们可以被用于索引和/或处理。

除了上面的数据类型,多考虑使用连接器,能使您自己的团队最容易将输入数据整合到您的数据总线。你可能不需要太多我在这里描述的通用性或灵活性,但设计一些灵活性总是好的,这使你系统能够摄取更多的数据类型,防止以后新数据到来要重新建造。

索引及保存数据

所有这些数据都需要保存在某个地方。最好在一个数据库中,当您的数据需要的增长时,将很容易扩展。并且如果该数据库提供对分析类型的查询方式支持,那最好不过了。如果这个数据中心只是为了存储日志和事件,那么你可以选择Elasticsearch。如果这只是关于度量指标,你可以选择一个时间序列数据库(TSDB)。但是我们都需要处理。我们最终建立了一个系统,有多个本地数据存储,以便我们能够最有效地处理不同类型的数据。

ElasticSearch保存日志以及Events

我们使用Elasticsearch作为事件数据库。这些事件可以有不同的“形状”,这取决于他们来自哪一个来源。我们使用了一些Elasticsearch API,效果很好,特别是查询和聚合API。

Cassandra和ElasticSearch保存Metrics

而metrics,原则上,是完全存储在Elasticsearch(或任何其他数据库),使用一个专门的匹配metrics数据结构以及metrics冗余数据的数据库将更有效。

最好的方法是使用现有的开源时间序列数据库(TSDB)。我们最初是这么使用的 —— 我使用开源TSDB并使用Cassandra作为后端。这种方法的挑战是,TSDB有自己的查询API,它不同于Elasticsearch的API。由于API之间的不同,为事件和指标提供一个统一的搜索和查询界面是很难的。

这就是为什么我们最终决定写自己的TSDB,通过Casandra和Elasticsearch存储metrics。具体来说,我们在Cassandra中存储的时间/值的键值对,在Elasticsearch中存储元数据,并在顶部有一个查询和管理层。这样,搜索和查询事件以及metrics可以统一在Elasticsearch做。

流式处理引擎

那么现在我们有一个摄取数据的途径和一些数据库。我们是否可以准备添加前端应用程序并使用我们的数据?并没有!尽管Elasticsearch本身可以做一些日志和事件分析,我们仍然还需要一个处理引擎。因为:
- 我们需要一个统一的方式来访问事件和指标,包括实时或历史的数据。
- 对于某些情况(监控、报警),当它发生时,我们需要实时处理这些数据。
- 度量指标!我们想要做的不只是寻找度量指标并读出来 - 度量指标是为了优化现有的度量。
- 即使是事件,我们需要一个比Elasticsearch API更通用的处理能力。例如,join不同的来源和数据,或做字符串解析,或自定义聚合。

从这里开始,事情变得非常有趣。你可以花一天(或更多)研究别人是如何建立数据管道,了解Lambda,Kappa等数据架构。实际上有很多非常好的资料在那里。我们就开门见山:我们达到的效果,是一个支持实时数据流和批处理计算的处理引擎。在这方面,我们完全支持,有兴趣的可以看这里以及这里

在这里,不同于存储和摄取,我们从头建立了自己的处理引擎,- 不是因为没有其他的流处理引擎,而是由于我们看重查询的性能,我们将在下面的部分单独讨论。更具体地说,我们建立了一个流处理引擎,实现了数据流处理模型,计算表示被表示为一系列操作的有向图,将输入转化为输出的,这些操作包括聚合,窗口,过滤或join。这能很自然的将模型的查询和计算组合起来,适合实时和批量,且适合分布式运行。

当然,除非你真的在寻找建立一个新的项目,然而我们推荐你使用一个开源的流处理引擎。我们建议你看看RiemannSpark Streaming或者Apache Flink

查询和计算

我们使用流处理引擎,基于数据流模型的计算。但用户如何表达查询和创建这样的数据流图?一个方法是提供一个API或嵌入式DSL。该接口将需要提供查询和筛选数据、定义转换和其他处理操作的方法,而且最重要的是,提供一种将多个处理阶段组合并应用到流图的方法。上述每一个项目都有自己的API,而个人的偏好可能有所不同,API常见的一个挑战是,SQL分析师或Excel用户无法方便的使用。

一个可能的解决问题的方案,在这一点上,可以让这些用户通过基于这些API构建的工具来访问系统(例如,一个简单的web应用程序)。

另一种方法是提供一个简单的查询语言。这是我们Jut在做的。因为目前没有现有的数据流的查询语言(如SQL之于关系查询),我们创建了一个数据流查询语言称为Juttle。它的核心,Juttle的流图查询语言可以用简单的语法,声明处理管道,如上图所示。它具有这些原语,search,window,join,aggregation和group-by,语法简单。当然,在处理一个流程图数据之前,你需要取得到数据 - Juttle允许您定义查询获取数据,通过事件和/或度量的任何组合,实时和/或历史的,都具有相同的语法和结构。下面是一个简单的例子,遵循一个模式…

query | analyze | view

(注意链接使用管道操作符,语法类似shell)。

read -from :1 day ago: data_type = 'web_log'

| reduce -every :minute: count() by status_code

| @timechart

拼在一起:一个异常检测的例子

到目前为止,我们已经采取了一个组件为中心的视角-我们已经讨论了组成成分和它们的作用,但没怎么提到关于如何将它们组合在一起。现在我们将视角切换到以数据为中心,看看支持实时和历史查询需要哪些步骤。让我们使用一个异常检测算法的实例来解说。这是一个很好的例子,因为我们需要查询历史数据来训练潜在的统计模型,实时流数据来测试异常,然后我们需要把结果写回系统,同时异常告警。

但是,在我们做任何查询之前,我们需要串联下摄取的整个过程,传入的数据是如何写入索引存储。这是由import服务完成的,服务完成了包括写入时间序列数据库,将指标数据和元数据存储在Elasticsearch和Cassandra。

现在一个用户来了,启动了一个异常检测的job。这需要读取历史数据,通过任务处理引擎直接查询底层数据库来进行的。不同的查询和数据可以进一步做性能优化(下面讨论),和/或实施度量数据库的读取路径(查询Elasticsearch中的元数据,获取Cassandra中的度量值,并结合结果产生实际的度量点)。

历史数据涵盖了一些过去范围内的数据,处理引擎将历史数据转换成流向图的实时数据。为了做到这一点,处理引擎直接将数据导入import服务的入口点。请注意,这种切换必须小心,以免数据丢弃或者数据重复。

在这一点上,我们有一个训练有素的异常检测流图运行在实时数据上。当检测到异常时,我们希望它将警报发送给一些外部的系统,这可以通过处理引擎向外部的HTTP服务POST数据。除了发送警报,我们还希望保持对内部系统的跟踪。换句话说,我们希望能够将数据流写回系统中。从概念上讲这是通过处理引擎管道返回数据到摄取途径。

调优

那么我们已有了一个摄取数据的工作系统的和一些数据库以及处理引擎。我们可以准备添加前端应用程序并分析我们的数据了吗?还没有!

嗯,我们实际上可以这样做,但问题是我们的查询性能仍然会非常慢。而缓慢的查询意味着……没有人会使用我们的系统。

因此,让我们重新审视一下“统一处理引擎”的概念。按照我们的解释,它是同一个系统使用相同结构,抽象和查询来处理历史或实时的数据。

性能挑战来自于这样的一个事实,历史数据比实时数据要多的多。例如,假设我们有一百万点/秒的速度输入到系统,并有一个是足够快处理过程,可以在数据录入时进行实时查询。现在采取相同的查询语义查询过去一天的数据 - 这将需要一次性处理数百亿点(或者,至少,必须能跟的上从存储点读取的速度)。假设计算是分布式的,我们可以通过增加计算节点来解决,但在最好的情况下,这将是低效和昂贵的。

所以这就是优化的所在。有许多方法可以优化数据查询。其中一些包括对查询本身进行转换 - 例如,上游数据的filters或aggregations尽可能不改变查询语义。我们说的这种优化,是将数据的filter和处理尽量由数据库去做。这需要做以下的:
- 自动识别可以由数据库处理查询的部分
- 将对应的部分转换成目标数据库的查询语言
- 运行后端查询并将结果注入到数据流图的正确位置

结语

我们做到了!当然,如果不需要一个可视化层,我们就完成了。只能通过API来查询系统。建立一个客户端应用程序来创建查询,流和可视化数据,组合仪表板是另外一个棘手的问题,所以我们将改天讨论这个。

现在,让我们来总结一下我们在建设这个数据中心过程中的所见所闻:
- 一个摄取途径,可以接受不同来源的输入数据,并将其转换为统一的格式,并储存起来供以后消费。(在Jut,这是基于Kafka建立的)。
- 事件和度量的数据库。在Jut,Events使用Elasticsearch,自己构建的度量数据库则基于Cassandra。
- 一个处理引擎(或是两个,如果你要用lambda ISH架构)。
- 在系统上运行查询的API或查询语言。

唷。建立这套系统,是一个漫长而有趣的旅程。即便你要建立你自己的系统,可以先试试Jut。你可能会觉得很好用。

时间: 2024-09-19 16:58:53

使用Elasticsearch,Kafka和Cassandra构建流式数据中心的相关文章

【译】使用Apache Kafka构建流式数据平台(1)

前言:前段时间接触过一个流式计算的任务,使用了阿里巴巴集团的JStorm,发现这个领域值得探索,就发现了这篇文章--Putting Apache Kafka To Use: A Practical Guide to Building a Stream Data Platform(Part 1).在读的过程中半总结半翻译,形成本文,跟大家分享. 最近你可能听说很多技术名词,例如"流式处理"."事件数据"以及"实时"等,与之相关的技术有Kafka.S

使用Spark SQL 构建流式处理程序

前言 今天介绍利用 StreamingPro 构建流式(Spark Streaming)计算程序 准备工作 下载StreamingPro README中有下载地址 我们假设您将文件放在了/tmp目录下. 填写配置文件 实例一,Nginx日志解析后存储到ES gist 测试样例, 模拟数据,并且单机跑起来 gist 假设你使用的是第二个配置文件,名字叫做test.json,并且放在了/tmp目录下. 启动StreamingPro Local模式: cd $SPARK_HOME ./bin/spar

动态-流式数据的含义与解释

问题描述 流式数据的含义与解释 求大神解释什么是流式数据?动态图数据和流式图数据是不是一回事? 解决方案 动态图数据和流式图数我觉得应该还是有些差别的. 动态图数据可以是一个数组之类的,流式图数就是一个字节流.

《Flume日志收集与MapReduce模式》一1.3 HDFS与流式数据/日志的问题

1.3 HDFS与流式数据/日志的问题 HDFS并不是真正的文件系统,至少从传统的认识来说不是这样,对于通常的文件系统来说,很多我们认为理所当然的东西并不适合于HDFS,比如挂载.这使得将流式数据装载进Hadoop中变得有些复杂.在通常的Portable Operating System Interface(POSIX)风格的文件系统中,如果打开文件并写入数据,那么在文件关闭前它会一直存在于磁盘上.也就是说,如果另一个程序打开了相同的文件并开始读取,那么它会读取到写入器写到磁盘上的数据.此外,如

集装箱式数据中心的实施战略和注意事项

集装箱数据中心VS传统数据中心 集装箱数据中心是一个可作为数据中心构建的标准模块,将计算,存储,网络资源等IT设施都设计到一个集装箱中.集装箱通常在本地根据客户不同的需求配置好多个机架,然后再运到客户的位置进行部署.集装箱数据中心往往拥有较低的空间,电力及冷却成本的优势,而且可以部署在有电力支持的任何地区,甚至是停车场. 不同于传统数据中心的结构,传统架构一般有一个核心层而且所有的资源都作为一个整体来进行管理.模块化数据中心采用的是分布式的核心架构,相对于整体架构来讲,集装箱数据中心拥有以下几个

谁会使用融合式数据中心基础架构?

融合式数据中心架构的出现显着的改善了几十年来服务器.路由器.存储以及由此而产生的管理方式已经给IT执行者们带来非常大的困扰,这一管理挑战并有助于实现资源按需共享的方式. "以前,融合指的是语音与数据链路通信的整合,"麻省韦尔斯利的一家资讯公司,Think Strategies Inc.的管理总监Jeffrey Kaplan回忆到,"如今的融合管理指的是对传统数据中心环境的实现集中管控,并同时允许授权终端用户对分配给他们的资源实行自主运维". 企业可以建设自己的融合式

集装箱式数据中心对布线系统的影响

本文主要讨论的是布线系统作为网络传输的基础,集装箱式架构数据中心的出现对其会造成什么样的影响,以及布线系统会有什么相应的解决方案. 经过近年来数据中心建设的高速发展,各种新架构层出不穷.在2008年,业界又出现了一种新式的架构:集装箱式数据中心.这种新式架构的数据中心因为其安装快速.部署方便.节能省电.空间占用小等特点,立即在全球各地得到了广泛的应用.各大IT厂商如IBM(PMDC).HP(POD).SUN(Black Box).SGI(ICE Cube)等也都推出了相应的解决方案.各大厂商的解

漂浮式数据中心降低成本及功耗

这种最新的数据中心设计将涉及大量的水,但又不是以通常的方式--其是一个漂浮在水面的数据中心. 鉴于水是保持数据中心设备冷却的最重要的成分,这导致了一家企业直接选择在水面上建立其最新型的漂浮式数据中心. Nautilus Data Technologies Inc.最近推出了一款水上数据中心.这家位于加利福尼亚普莱森顿的公司在今年早些时候就已经在加利福尼亚州马雷岛海军造船厂的一艘110英尺长的船上完成了概念验证. 该生产数据中心正在一艘230英尺长的驳船上打造,将在多个楼层拥有30000平方英尺的

软硬结合 如何构建高效的数据中心?

能耗问题一直是各大数据中心的心头之痛.有数据表明,2015年我国数据中心能耗预计将高达1000亿度,相当于整个三峡水电站一年的发电量;目前国内数据中心的PUE值普遍过高,从2.2到2.6不等.而在国外,施耐德电气参建的位于瑞典的世界首个气候友好型数据中心EcoDataCenter将PUE值控制在1.15以内;位于北卡罗来纳州的勒努瓦的谷歌数据中心,PUE值更是降到了1.12.那么,针对 高能耗.设计不合理.运维成本高的问题,国内要如何构建出如此高效的数据中心?   从设备管理转向资产管理 前一阶