Kafka实战-Flume到Kafka

1.概述

  前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据。下面是今天要分享的目录:

  • 数据来源
  • Flume到Kafka
  • 数据源加载
  • 预览

  下面开始今天的分享内容。

2.数据来源

  Kafka生产的数据,是由Flume的Sink提供的,这里我们需要用到Flume集群,通过Flume集群将Agent的日志收集分发到
Kafka(供实时计算处理)和HDFS(离线计算处理)。关于Flume集群的Agent部署,这里就不多做赘述了,不清楚的同学可以参考《高可用Hadoop平台-Flume NG实战图解篇》一文中的介绍,下面给大家介绍数据来源的流程图,如下图所示:

  这里,我们使用Flume作为日志收集系统,将收集到的数据输送到Kafka中间件,以供Storm去实时消费计算,整个流程从各个Web节点
上,通过Flume的Agent代理收集日志,然后汇总到Flume集群,在由Flume的Sink将日志输送到Kafka集群,完成数据的生产流程。

3.Flume到Kafka

  从图,我们已经清楚了数据生产的流程,下面我们来看看如何实现Flume到Kafka的输送过程,下面我用一个简要的图来说明,如下图所示:

  这个表达了从Flume到Kafka的输送工程,下面我们来看看如何实现这部分。

  首先,在我们完成这部分流程时,需要我们将Flume集群和Kafka集群都部署完成,在完成部署相关集群后,我们来配置Flume的Sink数据流向,配置信息如下所示:

  • 首先是配置spooldir方式,内容如下所示:
producer.sources.s.type = spooldir
producer.sources.s.spoolDir = /home/hadoop/dir/logdfs

  • 当然,Flume的数据发送方类型也是多种类型的,有:Console、Text、HDFS、RPC等,这里我们系统所使用的是Kafka中间件来接收,配置内容如下所示:
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=dn1:9092,dn2:9092,dn3:9092
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=test

  这样,我们就在Flume的Sink端配置好了数据流向接受方。

4.数据加载

  在完成配置后,接下来我们开始加载数据,首先我们在Flume的spooldir端生产日志,以供Flume去收集这些日志。然后,我们通过Kafka的KafkaOffsetMonitor监控工具,去监控数据生产的情况,下面我们开始加载。

  • 启动ZK集群,内容如下所示:


zkServer.sh start

  注意:分别在ZK的节点上启动。

  • 启动Kafka集群


kafka-server-start.sh config/server.properties &

  在其他的Kafka节点输入同样的命令,完成启动。

  • 启动Kafka监控工具
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \
 com.quantifind.kafka.offsetapp.OffsetGetterWeb \
 --zk dn1:2181,dn2:2181,dn3:2181 \
 --port 8089 \
 --refresh 10.seconds \
 --retain 1.days

  启动Flume集群



flume-ng agent -n producer -c conf -f flume-kafka-sink.properties -Dflume.root.logger=ERROR,console

  然后,我在/home/hadoop/dir/logdfs目录下上传log日志,这里我只抽取了一少部分日志进行上传,如下图所示,表示日志上传成功。

5.预览

  下面,我们通过Kafka的监控工具,来预览我们上传的日志记录,有没有在Kafka中产生消息数据,如下所示:

  • 启动Kafka集群,为生产消息截图预览

  • 通过Flume上传日志,在Kafka中产生消息数据

6.总结

  本篇文章给大家讲述了Kafka的消息产生流程,后续会在Kafka实战系列中为大家讲述Kafka的消息消费流程等一整套流程,这里只是为后续的Kafka实战编码打下一个基础,让大家先对Kafka的消息生产有个整体的认识。

7.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

时间: 2024-10-29 05:40:05

Kafka实战-Flume到Kafka的相关文章

Kafka实战-入门

1.概述 经过一个多月的时间观察,业务上在集成Kafka后,各方面还算稳定,这里打算抽时间给大家分享一下Kafka在实际场景中的一些使用心得.本篇博客打算先给大家入个门,让大家对Kafka有个初步的了解,知道Kafka是做什么的,下面是本篇博客的目录内容: Kafka背景 Kafka应用场景 Kafka架构原理 下面开始今天的博客分享内容. 2.Kafka背景 Kafka它本质上是一个消息系统,由当时从LinkedIn出来创业的三人小组开发,他们开发出了Apache Kafka实时信息队列技术,

Kafka实战-实时日志统计流程

1.概述 在<Kafka实战-简单示例> 一文中给大家介绍来Kafka的简单示例,演示了如何编写Kafka的代码去生产数据和消费数据,今天给大家介绍如何去整合一个完整的项目,本篇博客我打 算为大家介绍Flume+Kafka+Storm的实时日志统计,由于涉及的内容较多,这里先给大家梳理一个项目的运用这些技术的流程.下面是今天的内容 目录: 项目流程 Flume Kafka Storm 下面开始今天的内容分享. 2.项目流程 在整合这套方案的时候,项目组也是经过一番讨论,在讨论中,观点很多,有人

Kafka实战-Kafka到Storm

1.概述 在<Kafka实战-Flume到Kafka>一文中给大家分享了Kafka的数据源生产,今天为大家介绍如何去实时消费Kafka中的数据.这里使用实时计算的模型--Storm.下面是今天分享的主要内容,如下所示: 数据消费 Storm计算 预览截图 接下来,我们开始分享今天的内容. 2.数据消费 Kafka的数据消费,是由Storm去消费,通过KafkaSpout将数据输送到Storm,然后让Storm安装业务需求对接受的数据做实时处理,下面给大家介绍数据消费的流程图,如下图所示: 从图

Kafka实战-数据持久化

1.概述 经过前面Kafka实战系列的学习,我们通过学习<Kafka实战-入门>了解Kafka的应用场景和基本原理,<Kafka实战-Kafka Cluster>一文给大家分享了Kafka集群的搭建部署,让大家掌握了集群的搭建步骤,<Kafka实战-实时日志统计流程>一文给大家讲解一个项目(或者说是系统)的整体流程,<Kafka实战-Flume到Kafka>一文给大家介绍了Kafka的数据生产过程,<Kafka实战-Kafka到Storm>一文给

Kafka实战-简单示例

1.概述 上一篇博客<Kafka实战-Kafka Cluster>中,为大家介绍了Kafka集群的安装部署,以及对Kafka集群Producer/Consumer.HA等做了相关测试,今天我们来开发一个Kafka示例,练习如何在Kafka中进行编程,下面是今天的分享的目录结构: 开发环境 ConfigureAPI Consumer Producer 截图预览 下面开始今天的内容分享. 2.开发环境 在开发Kafka相关应用之前,我们得将Kafka得开发环境搭建完成,这里我所使用得开发环境如下所

Kafka实战-Storm Cluster

1.概述 在<Kafka实战-实时日志统计流程>一文中,谈到了Storm的相关问题,在完成实时日志统计时,我们需要用到Storm去消费Kafka Cluster中的数据,所以,这里我单独给大家分享一篇Storm Cluster的搭建部署.以下是今天的分享目录: Storm简述 基础软件 安装部署 效果预览 下面开始今天的内容分享. 2.Storm简述 Twitter将Storm开源了,这是一个分布式的.容错的实时计算系统,已被贡献到Apache基金会,下载地址如下所示: http://stor

flume kafka-关于flume和kafka结合效率的问题

问题描述 关于flume和kafka结合效率的问题 最近做了个测试.是flume+kafka的.是读取文件夹的.31M的文件读了很长时间.大概20分钟.不知道什么原因.哪位大神知道啊.指导下. 下面是flume的配置 #agent section producer.sources = s producer.channels = c producer.sinks = r #source section #producer.sources.s.type = seq #producer.sources

Kafka实战-KafkaOffsetMonitor

1.概述 前面给大家介绍了Kafka的背景以及一些应用场景,并附带上演示了Kafka的简单示例.然后,在开发的过程当中,我们会发现一些问题,那就 是消息的监控情况.虽然,在启动Kafka的相关服务后,我们生产消息和消费消息会在终端控制台显示这些记录信息,但是,这样始终不够友好,而且,在实际 开发中,我们不会有权限去一直观看终端控制台,那么今天就为大家来介绍Kafka的一个监控系统--KafkaOffsetMonitor.下面是今天所 分享的目录内容: KafkaOffsetMonitor简述 K

Kafka实战-Kafka Cluster

1.概述 在<Kafka实战-入门>一篇中,为大家介绍了Kafka的相关背景.原理架构以及一些关键知识点,本篇博客为大家来赘述一下Kafka Cluster的相关内容,下面是今天为大家分享的目录: 基础软件的准备 Kafka Cluster的部署 Send Messages HA特性 下面开始今天的内容分享. 2.基础软件的准备 2.1 ZK 由于Kafka Cluster需要依赖ZooKeeper(后面简称ZK)集群来协同管理,所以这里我们需要事先搭建好ZK集群,关于ZK的集群搭建,大家可以