Logstash + DataHub + MaxCompute/StreamCompute 进行实时数据分析

Logstash是一款开源日志收集处理框架,有各种不同的input、filter、output插件,用户使用这些插件可以将各种数据源导入到其他系统。
logstash-output-datahub插件,实现将数据导入DataHub的功能,通过简单的配置即可完成数据采集和向DataHub的传输任务。
结合StreamCompute(Galaxy)用户可以方便的完成流式数据从采集,传输,开发到结果数据存储与展示的整套解决方案。
同时,还可以通过创建Collector同步任务将数据同步到MaxCompute(ODPS),之后在MaxCompute上进行完备的数据开发工作。

接下来,会将各个流程步骤在文章中作详细描述,以帮助用户使用Logstash+DataHub+StreamCompute/MaxCompute快速构建起自己的流式数据应用。

数据通道

DataHub服务是阿里云的基于飞天开发的pubsub服务;
创建用于数据采集与传输的DataHub Topic是我们的第一步。

Endpoint列表

公共云DataHub服务Endpoint列表:

公有网络 经典网络ECS Endpoint VPC ECS Endpoint
http://dh-cn-hangzhou.aliyuncs.com http://dh-cn-hangzhou.aliyun-inc.com http://dh-cn-hangzhou-vpc.aliyuncs.com

基本概念

首先,明确DataHub中的几个概念,具体可参见DataHub基本概念:

  • Shard: Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态 : "Opening" - 启动中,"Active" - 启动完成可服务
  • Lifecycle: 表示一个Topic中写入数据可以保存的时间,以天为单位
  • Record: 用户数据和DataHub服务端交互的基本单位
  • Schema: 描述Record必须遵守的格式,以及每个字段的类型,包括:bigint、string、boolean、double和timestamp

创建Topic

目前DataHub提供的工具包括Datahub Java SDK和DataHub webconsole,另外console还处于试用阶段,若有需要可联系我们提供。

  • Webconsole

用户可在webconsole上完成对所属资源的基本操作,包括创建、查看、删除Topic以及数据抽样等。在webconsole中创建Topic如下所示:

  • SDK

依次调用以下接口来完成Project和Topic的创建,SDK的一些基本接口可参考SDK基本说明。

<dependency>
  <groupId>com.aliyun.datahub</groupId>
  <artifactId>aliyun-sdk-datahub</artifactId>
  <version>2.3.0-public</version>
</dependency>

public class DatahubClient {
    /**
     * 初始化DatahubClient,
     * @param conf Datahub的配置信息,包括用户的账号信息和datahub endpoint
     */
    public DatahubClient(DatahubConfiguration conf);

    /**
     * 创建Datahub topic
     * @param projectName 该topic所属的project
     * @param topicName 要创建的topic名字
     * @param shardCount 指定该topic的shard数量
     * @param lifeCycle 数据回收时间
     * @param recordType 该topic的record类型,包括TUPLE和BLOB
     * @param recordSchema 当recordType为TUPLE时,需要指定schema
     * @param desc topic的描述信息
     */
    public createTopic(String projectName, String topicName, int shardCount, int lifeCycle, RecordType recordType, RecordSchema recordSchema, String desc);
}

数据采集

由于DataHub提供创建具有schema的Topic的功能,所以用户在使用logstash将数据采集到datahub时,可同时完成对原始数据清洗工作。这样在后续的数据分析工作中,用户能更加方便的进行数据开发。

安装

$ {LOG_STASH_HOME}/bin/plugin install --local logstash-output-datahub-1.0.0.gem
  • 直接下载免安装版logstash(下载地址)。 解压即可使用。
$ tar -xzvf logstash-with-datahub-2.3.0.tar.gz
$ cd logstash-with-datahub-2.3.0

配置信息

我们以一条典型的日志为例,说明如何配置logstash和datahub topic.

示例日志为:

20:04:30.359 [qtp1453606810-20] INFO  AuditInterceptor - [13pn9kdr5tl84stzkmaa8vmg] end /web/v1/project/fhp4clxfbu0w3ym2n7ee6ynh/statistics?executionName=bayes_poc_test GET, 187 ms

对应的Datahub Topic的schema定义为:

字段名称 字段类型
request_time STRING
thread_id STRING
log_level STRING
class_name STRING
request_id STRING
detail STRING

Logstash配置文件为:

input {
    file {
        path => "${APP_HOME}/log/bayes.log"
        start_position => "beginning"
    }
}

filter{
    # 对每一条日志message进行分割,并将各分片指定对应的tag
    # 若将整条日志作为Topic的一个字段,可创建只包含(message string)字段的Topic,从而不用配置grok filter
    grok {
        match => {
           "message" => "(?<request_time>\d\d:\d\d:\d\d\.\d+)\s+\[(?<thread_id>[\w\-]+)\]\s+(?<log_level>\w+)\s+(?<class_name>\w+)\s+\-(?<detail>.+)"
        }
    }
}

output {
    datahub {
        access_id => ""
        access_key => ""
        endpoint => ""
        project_name => "project"
        topic_name => "topic"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/Users/u1/trash/dirty.data"
        dirty_data_file_max_size => 1000
    }
}

启动logstash数据采集

使用命令启动logstash开始数据采集

logstash -f 上述配置文件地址

可使用参数 -b 指定每次batch大小,即每次请求的记录条数,可进行性能调试

# 缓存1000条数据后发送,不指定时默认为125(logstash的默认配置)
logstash -f 上述配置文件地址 -b 1000

数据分析

目前DataHub和计算引擎StreamCompute(Galaxy)和MaxCompute(ODPS)已打通。

在StreamCompute中,可以通过配置DataHub数据源,直接进行数据开发,写入DataHub的数据会被StreamCompute订阅并进行实时计算。

同时,通过创建同步到MaxCompute的Collector,可以将DataHub数据同步到MaxCompute,从而在MaxCompute中进行数据开发。

StreamCompute

在StreamCompute中注册DataHub数据源(帮助文档)。

在StreamCompute中查看或使用DataHub数据(帮助文档)。

MaxCompute

可以通过创建Connector,将DataHub数据导入到MaxCompute(ODPS).
在Webconsole创建Connector是一件方便的事情,(webconsole地址)。如果有很多topic或者topic的field很多,不方便在页面上手动操作,也可以使用SDK。

创建Connector

创建Connector之前,用户必须已创建好MaxCompute的Table,并且所使用的账号必须具备该MaxCompute Project的CreateInstance权限和归档ODPS表的Desc、Alter、Update权限。
在webconsole创建Connector步骤可参考创建Connector.

欢迎加入MaxCompute钉钉群讨论

时间: 2024-10-30 04:07:19

Logstash + DataHub + MaxCompute/StreamCompute 进行实时数据分析的相关文章

【流数据与大屏DataV】如何使用DTS,Datahub,StreamCompute,RDS及DataV搭建流数据大屏

如何使用DTS,Datahub,StreamCompute,RDS及DataV搭建流数据大屏   一,数字化大屏的价值 我们的平台销售管理大屏实时数据展示系统,采用了阿里云最新的大数据及流计算技术,将客户的登录信息.设备信息,销售销量.金额,装车系统的客户信息.车辆信息,发运的地理位置.走向等,以飞线图.热力图.点图.传统的柱状图.饼图等多维度的形式展现给企业及相关领导.强大的视觉冲击和高度可视化的图形及数字展示给人带来清晰直观.真实和充满活力的销售数据. 图1 电商平台实施销售数据大屏

阿里首次披露中台战略:OneData的统一数据标准和实时数据分析是核心

"阿里巴巴正在建设数据中台,统一处理集团近千PB数据,每天被扫描的数据量相当于2千万部高清电影.目前对外服务千万商家与其它生态伙伴,对内服务上万名小二,2015年双十一当天平台调用超过75亿次." 阿里巴巴公共数据平台负责人 罗金鹏  4月20日, UBDC全域大数据峰会·2016上,阿里巴巴公共数据平台负责人罗金鹏首次对外披露了在阿里中台战略下,如何推动数据中台落地的个中细节.   据悉,中台战略是阿里巴巴于2015年底首次提出.作为阿里中台战略的核心之一--数据中台旨在对内提供数据

【CSDN在线培训Q/A】Amazon Kinesis实时数据分析最佳实践分享

问题描述 1月8日,亚马逊AWS的产品拓展经理庄富任在主题为"AmazonKinesis实时数据分析最佳实践分享"的在线培训中,为我们讲述了如何利用Kinesis架构实时数据流处理和分析的能力,并透过目前最热门的手游开发商Supercell客户案例,来体现Kinesis如何处理和分析海量数据流(例如用户的点击,消费,上线等动作).为了帮助大家更好的复习本次培训的相关内容,了解如何在AWS这样的云平台进行数据分析.CSDN整理了本次培训最后的QA,如下:Q1.目前Strom的社区进展缓慢

实时数据分析公司如何颠覆唱片行业?

 实时数据分析就是技术界传说中的大王乌贼:确实有公司在做实时数据分析,却难觅其踪迹. Mixcloud,这一流媒体音频内容的在线平台就是这种神秘生物的一员.这家总部位于伦敦被称作"音频YouTube"的创业公司,不仅使用实时分析以做出快速的业务决策并创造更好的产品,而且在致力于构建一个面向客户的实时数据分析门户网站.Mixcloud首席技术官,也是四个创始人之一的Mat Clayton说,当这个门户网站完成以后, Mixcloud的客户将能够看到是谁在何时收听他们音乐目录.   200

Druid 实时数据分析存储系统

简介 Druid 是一个开源的,分布式的,列存储的,适用于实时数据分析的存储系统,能够快速聚合.灵活过滤.毫秒级查询.和低延迟数据导入. Druid在设计时充分考虑到了高可用性,各种节点挂掉都不会使得druid停止工作(但是状态会无法更新): Druid中的各个组成部分之间耦合性低,如果不需要实时数据完全可以忽略实时节点: Druid使用Bitmap indexing加速列存储的查询速度,并使用CONCISE算法来对bitmap indexing进行压缩,使得生成的segments比原始文本文件

LSI总裁:企业迫切需要实时数据分析解决方案

摘要: LSI公司总裁兼首席执行官 Abhi Talwalkar 近日,在第六届年度加速创新峰会(AIS)上,LSI公司总裁兼首席执行官Abhi Talwalkar提出,数据中心从未像今天这样,对企业的成功有如此巨大的战略 LSI公司总裁兼首席执行官 Abhi Talwalkar 近日,在第六届年度加速创新峰会(AIS)上,LSI公司总裁兼首席执行官Abhi Talwalkar提出,数据中心从未像今天这样,对企业的成功有如此巨大的战略意义,企业迫切需要实时数据分析解决方案. 本届AIS峰会主要围

实时数据分析平台Anametrix走红

中介交易 http://www.aliyun.com/zixun/aggregation/6858.html">SEO诊断 淘宝客 云主机 技术大厅 最近实时数据分析炒得很热,因为大家越来越明白数据生金的道理了. Anametrix实时数据平台刚刚获得440万美元的A轮融资,公司准备把这些融资用于增加更多资源,以便满足用户越来越多的市场分析需求. 市场营销人员最高兴的就是这个工具能够抓取各类活跃的项目数据,再统一分析,进行管理,进而做聪明人生意,而你只需一个Anametrix帮你负责处理数

阿里巴巴大数据计算平台MaxCompute(原名ODPS)全套攻略(持续更新20171122)

  概况介绍 大数据计算服务(MaxCompute,原名ODPS,产品地址:https://www.aliyun.com/product/odps)是一种快速.完全托管的TB/PB级数据仓库解决方案.MaxCompute向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题,有效降低企业成本,并保障数据安全.本文收录了大量的MaxCompute产品介绍.技术介绍,帮助您快速了解MaxCompute/ODPS. MaxCompute 2.0:阿里巴巴的大数

MaxCompute文章索引

概况介绍: MaxCompute 2.0 生态开放之路及最新发展 10年老兵带你看尽MaxCompute大数据运算挑战与实践 一分钟了解阿里云产品:大数据计算服务MaxCompute概述 数加平台如何通过Serverless 架构实现普惠大数据 淘宝大数据之路 应用案例: 日志分析: 云数据,大计算-海量日志数据分析与应用 <海量日志数据分析与应用>之数据采集 <海量日志数据分析与应用>之社交数据分析:好友推荐 <海量日志数据分析与应用>之报表分析与展现 <海量日