Kafka在行动:7步实现从RDBMS到Hadoop的实时流传输

对于寻找方法快速吸收数据到Hadoop数据池的企业, Kafka是一个伟大的选择。Kafka是什么? 它是一个分布式,可扩展的可靠消息系统,把采取发布-订阅模型的应用程序/数据流融为一体。 这是Hadoop的技术堆栈中的关键部分,支持实时数据分析或物联网数据货币化。

本文目标读者是技术人员。 继续读,我会图解Kafka如何从关系数据库管理系统(RDBMS)里流输数据到Hive, 这可以提供一个实时分析使用案例。 为了参考方便,本文使用的组件版本是Hive 1.2.1,Flume 1.6和Kafka 0.9。

如果你想看一下Kafka是什么和其用途的概述, 看看我 在Datafloq 上发布的一篇早期博客。

Kafka用武之地:整体解决方案架构

下图显示了在整体解决方案架构中,RDBMS的业务数据传递到目标 Hive 表格结合了 Kafka , Flume和Hive交易功能。

7步实时流传输到Hadoop

现在深入到解决方案的详细信息,我会告诉你如何简单几步实时流输数据到Hadoop。

1. 从关系数据库管理系统(RDBMS)提取数据

所有关系数据库都有一个记录最近交易的日志文件。 我们的传输流解决方案的第一步是,在能够传到Hadoop的信息格式中获得这些交易。 讲完提取机制得单独占用一篇博文–所以 如果你想了解更多此过程的信息, 请联系我们。

2. 建立Kafka Producer

发布消息到Kafka主题的过程被称为“生产者”。“主题”是Kafka保存的分类消息。 RDBMS的交易将被转换为Kafka话题。 对于该例,让我们想一想销售团队的数据库,其中的交易是作为Kafka主题发表的。 建立Kafka生产者需要以下步骤:

3. 设置 Hive

接下来,我们将在Hive中创建一张表,准备接收销售团队的数据库事务。 在这个例子中,我们将创建一个客户表:

为了让Hive能够处理交易, 配置中需要以下设置:

hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.dbtxnmanager

4.设置Flume Agent,从Kafka到Hive流传输

现在让我们来看看如何创建Flume代理,实现从Kafka主题中获取数据,发送到Hive表。

遵循步骤来设置环境,然后建立Flume代理:

接着,如下创建一个log4j属性文件:

然后为Flume代理使用下面的配置文件:

5.开启Flume代理

使用如下命令开启Flume代理:

$ /usr/hdp/apache-flume-1.6.0/bin/flume-ng agent -n flumeagent1 -f ~/streamingdemo/flume/conf/flumetohive.conf

6.开启Kafka Stream

如下示例,是一个模拟交易消息, 在实际系统中需要由源数据库生成。 例如,以下可能来自重复SQL交易的Oracle数据流,这些交易已提交到数据库, 也可能来自GoledenGate。

7.接收Hive数据

以上所有完成, 现在从Kafka发送数据, 你会看到,几秒之内,数据流就发送到Hive表了。

本文作者:Rajesh Nadipalli

来源:51CTO

时间: 2024-08-04 01:11:08

Kafka在行动:7步实现从RDBMS到Hadoop的实时流传输的相关文章

Kafka下RDBMS中的数据实时传输到Hadoop

现在我们来深入这个解决方案的细节,我将展示你如何可以通过仅仅几步就把数据实时导入到Hadoop中. 1.从RDBMS中抽取数据 所有的关系型数据库都有一个日志文件用于记录最新的事务信息.我们流解决方案的第一步就是获取这些事务数据,并使得Hadoop可以解析这些事务格式.(关于如何解析这些事务日志,原作者并没有介绍,可能涉及到商业信息.) 2.启动Kafka Producer 将消息发送到Kafka主题的进程成为生产者.Topic将Kafka中同类消息写入到一起.RDBMS中的事务消息将被转换到K

一步一步学习大数据:Hadoop生态系统与场景

Hadoop概要 到底是业务推动了技术的发展,还是技术推动了业务的发展,这个话题放在什么时候都会惹来一些争议. 随着互联网以及物联网的蓬勃发展,我们进入了大数据时代.IDC预测,到2020年,全球会有44ZB的数据量.传统存储和技术架构无法满足需求.在2013年出版的<大数据时代>一书中,定义了大数据的5V特点:Volume(大量).Velocity(高速).Variety(多样).Value(低价值密度).Veracity(真实性). 当我们把时间往回看10年,来到了2003年,这一年Goo

H264码流打包分析(精华)

H264码流打包分析 SODB 数据比特串-->最原始的编码数据 RBSP 原始字节序列载荷-->在SODB的后面填加了结尾比特(RBSP trailing bits 一个bit"1")若干比特"0",以便字节对齐. EBSP 扩展字节序列载荷-- >在RBSP基础上填加了仿校验字节(0X03)它的原因是: 在NALU加到Annexb上时,需要填加每组NALU之前的开始码 StartCodePrefix,如果该NALU对应的slice为一帧的开始则

数据中心管理的最佳方案

数据中心基础设施管理(DCIM)有助于规划公司的基础设施及改进数据中心的管理.但是,DCIM也可能导致IT部门与设备部门之间缺乏协调.在本文中,将了解如何帮助这两个团队积极有效的沟通.配合,通过使用不同的DCIM产品和工具成功地管理数据中心.这是获得处理数据中心管理及目前不断变化的服务器技术问题的最优方法. 使用DCIM工具数据中心管理 顾名思义,数据中心基础设施管理(DCIM)能够帮助企业.公司对数据中心进行有效管理.可当你如果询问数据中心不同部门的工作人员,"基础设施"是什么,都包

基于Hadoop的大数据企业前十大集合

超人气Hadoop初创公司前两名 这已经不再是什么秘密了,全球的数据正在以几何数字增长,借助这股数据浪潮在全球范围内迅速成长起来一大批Hadoop的初创型公司.作为Apache的一个开源分支Hadoop几乎已经成为了大数据的代言词.据Gartner估计,目前的Hadoop生态系统市场价值大约为77,000,000: 该研究公司预计,这一数字到2016年将迅速增加到8.13亿美元. 在Hadoop市场快速发展的大环境下,出现了大量的初创型企业来分这将近十亿美元的大馅饼. 1.Platfora 他们

Apache Kylin发布新版流处理引擎

Apache Kylin在 1.5.0 推出了从流数据进行准实时(Near Real Time)处理功能,可以直接从Apache Kafka的主题(Topic)中消费数据来构建Cube.Apache Kylin 1.5.0的流处理是一次实验性的探索,它打破了以往只能从Apache Hive表构建Cube的局限,将数据从产生到可查询的延迟从小时级降低到了分钟级,满足了一些对实时性要求比较高的场景:但它在实现上存在一些局限︰ 不可扩展︰ 由于是利用单个 Java 进程(而不是利用某种计算框架)对数据

《KAFKA官方文档》第三章:快速入门(二)

第八步:使用Kafka流(Kafka Streams)处理数据 Kafka流是一个针对存储于Kafka brokers上的数据进行实时流处理和分析的客户端类库.快速入门中的示例将展示如何使用这个类库实现一个数据流处理应用.下面是其中的WordCountDemo数单词示例代码片段(转换成Java8的lambda表达式更便于阅读). "` // 字符串和长整型的序列化器与反序列化器(serde) final Serde stringSerde = Serdes.String(); final Ser

《KAFKA官方文档》入门指南(三)

第7步:使用Kafka连接导入/导出数据 从控制台写入数据和写回控制台是一个很方便入门的例子,但你可能想用Kafka使用其他来源的数据或导出Kafka的数据到其他系统.相对于许多系统需要编写定制集成的代码,您可以使用Kafka连接到系统去导入或导出数据. Kafka Connect是包括在Kafka中一个工具,用来导入导出数据到Kafka.它是connectors的一个可扩展工具,其执行定制逻辑,用于与外部系统交互.在这个快速入门,我们将看到如何使用Kafka Connect做一些简单的连接器从

Kafka深度解析

[本文转自于Kafka深度解析] 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统.主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能 高吞吐率.即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输 同时支持离线数据处理和实时数据处理 为什么要用消息系统 解耦 在项目启动之初来预测将来项目