阿里云有两个消息产品,消息队列(ONS)与消息服务(MNS),ONS上有个很好用的功能消息轨迹,消息的生命周期都可以通过控制台查询,那么消息服务上,想看见消息从生产到消费的轨迹数据有什么好办法呢?我们以这个小小的需求为原型,介绍一下怎么利用阿里云现有的产品,搭建出一个简单的数据分析平台。
先画个数据流程架构图。
step 1:
MNS->SLS
先把MNS的日志数据写到SLS里面去,不用写代码,在MNS控制台日志管理页面做个配置即可。MNS的队列要打开logging功能,SLS控制台创建好project logstore这里自动就能看见,注意要一个域的才能配哦。
然后我们跑跑代码,收发点消息,完了去SLS控制台看一样,数据已然自动写进去了,如图
比方我们搜索一条消息79E4FBE5094E8911-2-15D3F907B3B-2000000DB,已经可以看见收发时间,那不就满足看消息轨迹功能了吗,这篇文章可以结束了!!但是本文是硬广,当然不能这么就结束了,堂堂一个攻城狮最擅长什么!当然是无事生非!
我不要在控制台搜索!不要登陆!要让谁都可以看见轨迹!我还不想用sls sdk写一句代码!来来来,下面满足您,顺便把阿里云的产品硬广做一遍。
step 2:
SLS->OSS
SLS里面的数据,有很多消费方式,最简单不写代码的办法就是配置投递到MaxComputer或者OSS,投MaxComputer常用,我们今天来试下OSS。
投递配置文档
注意下分区格式 mydate=%Y%m%d/log_%H%M%s
投递出来的目录就是 mydate=20170714/ 这样可以方便MaxComputer/Hive作为分区使用,截图是投递出来的OSS里面看见的效果。
我们根据日志服务里面数据长什么样来配置怎么写csv文件(之前的查询截图就是数据的样子),做个一一对应的配置,这样投递出来的数据,oss里面的cvs跟日志服务的数据就一样一样了。
好了,做完以上这些,投递就配置好了,需要注意的是,配置好投递之后只会从这个时间点开始生效,已经写入sls的旧数据是不会被投递的,我们再次启动MNS收发消息的程序,造点数据写到sls,让投递工作起来,看见投递任务完成后去OSS里面看看写入的文件吧,前面已经有截图了。
现在oss里面已经有数据了,我们怎么用这个数据看轨迹呢?
硬广告强势插入:
Maxcompute 2.0支持使用OSS数据源做外部表,直接可查OSS,简单介绍一下这个功能,前面几个set是打开2.0的开关,现在2.0的功能要申请通过才能使用。
set odps.task.major.version=2dot0_demo_flighting;
set odps.service.mode=off;
set odps.sql.planner.mode=lot;
set odps.sql.ddl.odps2=true;
set odps.sql.preparse.odps2=lot;
CREATE EXTERNAL TABLE IF NOT EXISTS mns_router_log_external (
AccountId INT,
Action STRING,
MessageId STRING,
NextVisibleTime INT,
ProcessTime INT,
QueueName STRING,
RemoteAddress STRING,
RequestId STRING,
Time STRING
)
PARTITIONED BY (
mydate STRING
)
STORED BY 'com.aliyun.odps.CsvStorageHandler'
WITH SERDEPROPERTIES (
'odps.properties.rolearn'='acs:ram::<aliId>:role/aliyunodpsdefaultrole'
)
LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/ali-shanghai/mns_router';
alter table mns_router_log_external add if not exists partition (mydate='20170714');
select * from mns_router_log_external where mydate='20170714' and MessageId='79E4FBE5094E8911-2-15D3F907B3B-2000000DB';
直接数加控制台运行,出结果:
在MaxComputer里面您可以写sql处理您的数据,比方查平均处理时间啊什么的,这里就是做做广告,不详细做Maxcompute的介绍了。
但素,我们的目标是不登陆直接看数据,这里就要祭出阿里云一款报表产品
Quick BI,Quick BI并不支持OSS作为数据源,可以支持RDS,Maxcompute等等,Maxcompute查询速度相对RDS来说较慢,那么我们就用RDS吧。用大数据处理套件来做数据同步,比方OSS倒入RDS。
Step 3:
OSS->RDS
先在rds上的数据库建张表
CREATE TABLE `mns_route_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`AccountId` bigint(20) DEFAULT NULL,
`Action` varchar(255) DEFAULT NULL,
`MessageId` varchar(255) DEFAULT NULL,
`NextVisibleTime` bigint(11) DEFAULT NULL,
`ProcessTime` bigint(11) DEFAULT NULL,
`QueueName` varchar(255) DEFAULT NULL,
`RemoteAddress` varchar(255) DEFAULT NULL,
`RequestId` varchar(255) NOT NULL,
`Time` varchar(255) NOT NULL,
PRIMARY KEY (`id`),
KEY `MessageId` (`MessageId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
大数据处理套件上我们先建好两个数据源,一个OSS的数据源mns_oss,再建一个RDS的数据源mns_log,然后用脚本模式配置一个同步任务,把数据同步到我们建的这个表里面。
{
"configuration": {
"reader": {
"plugin": "oss",
"parameter": {
"fieldDelimiterOrigin": ",",
"nullFormat": "",
"compress": "",
"datasource": "mns_oss",
"column": [
{
"name": 0,
"index": 0,
"type": "string"
},
{
"name": 1,
"index": 1,
"type": "string"
},
{
"name": 2,
"index": 2,
"type": "string"
},
{
"name": 3,
"index": 3,
"type": "string"
},
{
"name": 4,
"index": 4,
"type": "string"
},
{
"name": 5,
"index": 5,
"type": "string"
},
{
"name": 6,
"index": 6,
"type": "string"
},
{
"name": 7,
"index": 7,
"type": "string"
},
{
"name": 8,
"index": 8,
"type": "string"
}
],
"skipHeader": "false",
"encoding": "UTF-8",
"fieldDelimiter": ",",
"object": [
"mns_router/mydate=${bdp.system.bizdate}/log_*.csv"
]
}
},
"writer": {
"plugin": "mysql",
"parameter": {
"postSql": [],
"datasource": "mns_log",
"column": [
"AccountId",
"Action",
"MessageId",
"NextVisibleTime",
"ProcessTime",
"QueueName",
"RemoteAddress",
"RequestId",
"Time"
],
"writeMode": "replace",
"table": "`mns_route_log`",
"preSql": []
}
},
"setting": {
"errorLimit": {
"record": "100"
},
"speed": {
"concurrent": "5",
"mbps": "5"
}
}
},
"type": "job",
"version": "1.0"
}
注意这个脚本里面使用了一个系统参数${bdp.system.bizdate},这样我们可以配置每天凌晨跑一次,每次同步一天的数据。
运行这个任务,业务时间选2017-07-14,把刚才的数据同步进RDS。
可以配置成每天凌晨运行一次同步任务。
Step 4.
RDS->Quick BI
先在Quick BI添加一个RDS的数据源,找到表mns_route_log生成一个数据集,然后我们在这个数据集后面点一下”新建仪表板”,添加查询条件,保存之后公开仪表板,直接访问生成的地址,不用做任何登陆即可访问,效果如图
通过基于日志服务的数据采集,ODPS/OSS的数据计算,最后Quick BI的报表制作这条线路,小规模的初创公司可以快速的搭建完一个适合业务,容易扩展的数据分析平台,例如时下流行的推荐系统。
阿里云的产品!是不是AMAZING!好想配个无事生非的表情包!