阿里云大数据利器之-使用sql实现流计算做实时展现业务( flume故障转移版 )

实时业务处理的需求越来越多,也有各种处理方案,比如storm,spark等都可以。那以数据流的方向可以总结成数据源-数据搜集-缓存队列-实时处理计算-数据展现。本文就用阿里云产品简单实现了一个实时处理的方案。

一,总体架构

按照数据流向
数据采集:flume(配置故障转移)
缓存队列:datahub
https://help.aliyun.com/product/53345.html?spm=5176.7618386.3.4.cigK2v
数据计算:阿里流计算(StreamCompute)
https://help.aliyun.com/video_list/54212.html?spm=5176.7618386.3.2.COgP6l
数据落地:rds(mysql)
https://help.aliyun.com/document_detail/26092.html?spm=5176.7841871.6.539.9FTjxU
数据展现:Quick-BI
https://data.aliyun.com/product/bi?spm=5176.8142029.388261.284.spvIS0
或者大屏显示 DATA-V
https://data.aliyun.com/visual/datav?spm=5176.8142029.388261.283.spvIS0

二,搭建过程

1,flume配置搭建
flume在数据采集的开源框架中还是比较常用的,但是在采集输送到datahub中有可能网络断了或者服务器挂了。那这里配置了故障转移,如图,其中sink1和sink2为上面架构中的agentA和agentB.把agentA和agentB分别部署在两台服务器上。

在搭建flume时需要安装DatahubSink插件,参考https://help.aliyun.com/knowledge_detail/42843.html
那看下配置文件


# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# Describe/configure the source这里监控一个文件变化,写了一个定时脚本每秒插入一条
a1.sources.r1.type = exec
a1.sources.r1.channels=c1
a1.sources.r1.command=tail -F /usr/local/shangdan/test.txt

#define sinkgroups,在这里配置故障转移的sink组
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=failover
a1.sinkgroups.g1.processor.priority.k1=10//这里设置sink的优先级,优先发送到级别高的sink里
a1.sinkgroups.g1.processor.priority.k2=5
a1.sinkgroups.g1.processor.maxpenalty=10000

#define the sink 1,发送到agentA
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=agentA的ip
a1.sinks.k1.port=5555

#define the sink 2 ,发送到agentB
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=agentB的ip
a1.sinks.k2.port=5555

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel=c1
~

agentA和agentB的配置文件除了IP地址不一样,其他完全一致,这里贴其中一个

A single-node Flume configuration for Datahub
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels=c1
a1.sources.r1.bind= agentA的ip
a1.sources.r1.port= 5555
# Describe the sink
a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink
a1.sinks.k1.datahub.accessID = 
a1.sinks.k1.datahub.accessKey = **
a1.sinks.k1.datahub.endPoint = http://dh-cn-hangzhou.aliyun-inc.com
a1.sinks.k1.datahub.project = shangdantest
a1.sinks.k1.datahub.topic = databubtest
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ,//这里配置数据的分隔符
a1.sinks.k1.serializer.fieldnames = line//配置数据的字段
a1.sinks.k1.batchSize = 1
a1.sinks.k1.serializer.charset = UTF-8
a1.sinks.k1.shard.number = 1
a1.sinks.k1.shard.maxTimeOut = 60
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

三台服务配置完成后启动flume(先启动agentA和agentB)预期结果是agent1发送数据到agentA(优先级高的),如果停止agentA服务,会自动转换发送到agentB。重启agegtA的服务后,再次切回到agentA。
如图:正常启动数据正常传输经过agent1-agentB-datahub

此时,停掉agentA服务,日志报错,故障转移。

重启agentA服务,恢复到之前状态,切回到sink1

2,datahub创建,
在datahub控制台创建项目和topic,
设置分片和生命周期,具体方法见链接
https://help.aliyun.com/document_detail/47448.html?spm=5176.doc47443.6.584.UrSX1A;
datahub中看到有flume传过来的数据

3,配置阿里流计算
登录阿里流计算控制台
注册数据源datahub/rds(也支持阿里其他类型数据源)-编写流计算脚本-调试-上线-启动

如图先注册数据源供脚本使用。必须要有数据来源表和数据结果表。

在编写脚本时,可以直接引用表,会自动插入表结构和配置信息,非常方便

那开始编写脚本必须包括三部分
1,创建数据来源表,这里是datahub表
2,创建数据结果表,这里是rds表
3,将来源表数据写入结果表,并进行计算

如图

三、测试
脚本编写完毕,点击上方【调试】,可以自己先准备一些数据上传测试。也可以直接线上测试,点击上面【上线】,上线成功后在【运维】中能看到项目,点击启动,项目启动几秒就工作了如图:

 然后可以看到监控状态,计算延迟,数据是否倾斜等指标,也有更详细的链路可以查看

最后,我们把整个流程全部启动,到rds中看结果如图

当然,如果希望源源不断的流数据保存下来称为静态的数据,作为后续业务分析统计等用途,在datahub控制台可以直接配置归档到大数据计算服务(Maxcompute)中,直接入库为表数据。
如图

需要在Maxcompute中创建好对应表即可自动归档存储。详细配置如下
https://help.aliyun.com/document_detail/47453.html?spm=5176.doc47439.6.555.3GNrRs

好神奇,几句sql数据就源源不断的流过来,那么前端或者其他业务层可以过来拿数据展示了,数据还可以界面化配置归档入库,十分方便。如果有复杂逻辑计算的,可以申请开通流计算的udf功能,这样看来,学好sql和java,走遍天下都不怕。
数据可视化部分可以参考使用阿里云产品dataV,实现类似双十一大屏效果,也可以使用产品Quick-BI做实时报表。

时间: 2024-10-31 08:13:27

阿里云大数据利器之-使用sql实现流计算做实时展现业务( flume故障转移版 )的相关文章

阿里云大数据利器之-使用flume+sql实现流计算做实时展现业务(归档Maxcompute)

实时业务处理的需求越来越多,也有各种处理方案,比如storm,spark等都可以.那以数据流的方向可以总结成数据源-数据搜集-缓存队列-实时处理计算-数据展现.本文就用阿里云产品简单实现了一个实时处理的方案. 一,总体架构 按照数据流向 数据采集:flume(配置故障转移) 缓存队列:datahubhttps://help.aliyun.com/product/53345.html?spm=5176.7618386.3.4.cigK2v 数据计算:阿里流计算(StreamCompute)http

阿里云大数据利器Maxcompute学习之-假如你使用过hive

如果您是一个大数据开发工程师并且使用过hadoop的hive框架,那么恭喜您,阿里云的大数据计算服务-Maxcompute,您已经会了90%.这篇文章就来简单对比下Maxcompute和hive的异同,来方便刚开始使用Maxcompute的用户,从hive秒速迁移到Maxcompute的使用上. 首先,回顾下hive的概念. 1.hive是基于hadoop的,以表的形式来存储数据,实际上数据是存储在hdfs上,数据库和表其实是hdfs上的两层目录,数据是放在表名称目录下的,计算还是转换成mapr

阿里云大数据利器Maxcompute学习之--数据同步任务常见日志报错总结

在使用大数据开发套件时最常用的就是数据同步模块,工单里最常见的问题就是其中数据同步的问题,这里总结一些常见一些从Maxcompute到其他数据源的同步任务报错案例,主要是日志中出现数据回滚写入的问题.   那首先看下日志中数据回滚的原因,当数据写入rds或者hybridDB等一些支持事务的数据库中,数据批量写入,一旦由于各种原因没有写入成功,这个批次的数据会回滚重新写入,如果再次写入失败,就会报脏数据的错误导致任务失败.数据写入失败可能是以下原因导致回滚.1,脏数据(数据值超过数据类型最大范围,

阿里云大数据利器Maxcompute-使用mapjoin优化查询

大数据计算服务(MaxCompute,原名 ODPS)是一种快速.完全托管的 GB/TB/PB 级数据仓库解决方案.https://help.aliyun.com/document_detail/27800.html?spm=5176.7840267.6.539.po3IvS 主要有三种操作数据的方式SQL,UDF,MapReduce,了解hadoop的同学就比较熟悉这些东西了. 那么Maxcompute的SQL和标准SQL最大的区别就是在Maxcompute中SQL会被解析成MapReduce

阿里云大数据利器Maxcompute学习之--窗口函数实现分组TopN

看到很多用户经常会问如何对分组内进行排序. 官方文档:https://help.aliyun.com/document_detail/34994.html?spm=5176.doc27891.6.611.Q1bk3j 例如需求: 1. odps 里面能否做排名操作,比如一个表里面有 用户ID 和 金额 两个字段,用金额大小排序的话,我如何计算用户的排名(金额最大的是 第一名 ,以此类推) 2. 计算每个金融产品的最大投资者,或者前几名 类似这一类的需求,我们总结为实现分组内的排序,取TopN,那

阿里云大数据利器Maxcompute学习之--分区表的使用

初学大数据Maxcompute时部分用户不是很熟悉Maxcompute分区表的概念和使用,那这篇文章来简单介绍下分区表的概念及使用场景.  实际上,分区在很多框架中都有这个概念,比如开源框架中的hive等.打个比喻,某城市粮仓里存放麦子,粮仓里按照县城分为很多区域,每个县城都有自己的一块地方,每个县城的麦子放在自己对应的区域上.如果上级领导来检查,想看某县城的麦子情况,那直接可以根据区域来迅速找到该县城的麦子.对应到Maxcompute分区表,粮仓相当于其中一张表,每个区域相当于以这个县城命名的

阿里云大数据实验室:MaxCompute使用体验

阿里云大数据实验室时阿里云开发的一站式大数据教学实践和科研创新平台,提供创业创新大赛平台,为各行业用户提供简单易用的大数据真实环境,让数据价值触手可及.在阿里云大数据实验室中集成了MaxCompute.        作为一名初次使用MaxCompute的用户,我体会颇深.MaxCompute 开箱即用,拥有集成化的操作界面,你不必关心集群搭建.配置和运维工作.仅需简单的点击鼠标,几步操作,就可以在MaxCompute中上传数据,分析数据并得到分析结果. 作为一种快速.完全托管的 TB/PB 级

深入阿里云大数据IDE–MaxCompute Studio

摘要:在主办的云栖计算之旅第5期–大数据与人工智能分享中,阿里云计算平台高级专家薛明为大家深入地介绍了阿里云大数据IDE–MaxCompute Studio,并对于其特性和背后的技术思想进行了讲解. 本文根据演讲视频整理而成. 本次将与大家深入地分享阿里云数加平台的大数据开发套件--MaxCompute Studio.其实对于开发者而言,在大数据开发.数据处理以及管理作业时经常会使用到IDE,而在阿里巴巴内部也有上万名大数据开发者,他们也会使用数加平台,也就是阿里巴巴统一的计算引擎--MaxCo

双11来临,阿里云大数据(数加)会出哪些绝招?

双11来临,阿里云大数据(数加)会出哪些绝招? 双11电商       一年一度的"双11狂欢节"就要到了,眼看参加商家们都已经忙得不可开交:备货.营销.广告.预售......以往作战一般会历经"预热"."蓄势"."爆发"."返场"四个阶段,前两个阶段尤其重要,而眼看11月临近,很多商家再次为流量问题而伤神,阿里云的大数据团队继"数据魔方"."全景洞察"之后,新推出一