阿里云数加平台对物联网数据的实时流式分析实践--设备监控应用

前言

  阿里云在物联网提供整体的解决方案,包括IoT套件、大数据分析两个场景,解决了数据上云和数据分析的各种问题,如设备入网安全、数据转发、实时分析、离线分析模型等一整套链路贯通的智能方案。

  本文以一个设备的监控的例子选择一个链路的实践,目的是演示联物网在阿里云的最上手的实践。

总体框架

 通用的物联网解决方案,分为两个大的方面:设备数据上云、云上数据分析。大数据的部分可以通过MaxCompute建立和训练数据模型,应用用于实时数据,比如设备故障预测。

         图中较为全面的抽象化了物联网的整个流程,数据从各种各样的设备如机械设备、人体穿戴设备、移动设备等接到IoT套件中,当设备支持标准的通信协议时,通过在设备端写SDK的方式,向云上传输数据,如老旧的设备甚至可以集成芯片去支持新的通信协议;阿里云支持如C/JAVA版本的MQTT及CCP协议,反过来通过云上服务器端也提供如:JAVA/Python/.NET/PHP等SDK来管理和控制设备,例如开、关灯等场景。

IoT套件的帮助页面提供更多的介绍以及SDK下载和样例。

本次实践以图中加粗红线的为主线实践整个解决方案中的最简易流程,目的整个让链路通过几个例子形象的串起来。

实现步骤

1.设备入网

本次实践中通过模拟设备在周期性的产生数据,然后生成一个数据列表,通过一定周期来读取这个列表文件来模拟周期性的设备产生数据,进而达到模拟的效果。

手工生成一个JSON的数据格式,这种格式代表多数设备产生数据的格式。形如:

{
    "nowtime": "2000-02-15 04:14:57",
    "deviceid": "15030011",
    "devicecategory": "WII",
    "location": {
        "lng": "107.06",
        "lat": "29.85"
    },
    "pressure": {
        "InletPr": "62",
        "OutletPr": "95"
    },
    "RunningTime": "197",
    "Frequency": {
        "No1": [
            23.809019088745117,
            94.6796646118164
        ],
        "No2": [
            23.67925262451172,
            295.56805419921875
        ],
        "No3": [
            24.824684143066406,
            337.1220703125
        ]
    },
    "FreqFailure": [
        0,
        0,
        0
    ]
}

由于需要SDK程序固定周期读取一次数据(此处模拟效果,实际情况中,只需要在设备上用SDK根据设备产生数据的频率向云上写数据即可),所以这个数据文件列表里需要把每条JSON存成一行,形如:


通过阿里云官网上的IoT套件的SDK及例 实现相应的逻辑,在实现上传样例之前,需要在IoT的系统里创建相应的产品、设备、Topic(类似表,但无需要定义结构)等基础信息。

其中在设备端程序需要信息如:

        // 以下参数请到控制台申请 http://iot.console.aliyun.com/iot/
        opts.setDeviceName("Client_Name");// 设备名称
        opts.setDeviceSecret("device_sec");// 设备密钥
        opts.setProductAppKey("product_key");// 产品证书key
        opts.setProductAppSecret("product_sec");// 产品证书私钥
        opts.setAuthUrl("http://iot.channel.aliyun.com");// 认证地址

用户需要实现关键数据上传部分的逻辑,例如:

 private static void testSubPub(final CcpClient s) {
        String topic = "/" + opts.getProductAppKey() + "/pump"; //指定接收数据的Topic

            InputStream inputStream = null;
            InputStreamReader reader = null;
            BufferedReader br = null;
            try{
              File file = new File("device_source.json");
              inputStream = new FileInputStream(file);
              reader = new InputStreamReader(inputStream);
              br = new BufferedReader(reader);
              String line = br.readLine();  //循环读取文件,以上述文件一行为一条json数据
              while (line != null) {
                  LogUtil.print("Starting pub topic!"); //日志输出
                  System.out.println(line);
                  Publish msg2 = new Publish();
                  msg2.topic = topic;
                  msg2.aliveSecond = 30;
                  msg2.payload = line.getBytes();
                  //同步方式调用
                  PubAck result = s.publish(msg2);
                  LogUtil.print("publish result=" + result);

                  Thread.sleep(50000);
                  LogUtil.print("50秒后 ,发送下一行数据pub请求,发布到队列=" + topic);
                line = br.readLine();
              }

        } catch (Exception e) {
            e.printStackTrace();
        }  finally {
              try {
                if (br != null)
                  br.close();
                if (reader != null)
                  reader.close();
                if (inputStream != null)
                  inputStream.close();
              } catch (IOException e) {
                e.printStackTrace();
              }
            }
    }

2.规则引擎

数据进入IoT套件后,由规则引擎解析数据成结构化数据,规则引擎用类似SQL的语法去解析JSO的数据,但它不是标准的SQL语法,只是更已于用户理解。解析的部分形如:

SELECT
nowtime,deviceid,devicecategory,
"location.lng" as lng,
"location.lat" as lat,RunningTime,
"pressure.InletPr" as InletPr,
"pressure.OutletPr" as outletPr,
"Frequency.No1.[0]" as No1FreqTemp,
"Frequency.No1.[1]" as No1FreqTotalKW,
"Frequency.No2.[0]" as No2FreqTemp,
"Frequency.No2.[1]" as No2FreqTotalKW,
"Frequency.No3.[0]" as No3FreqTemp,
"Frequency.No3.[1]" as No3FreqTotalKW,
"FreqFailure.[0]" as No1FreqFailure,
"FreqFailure.[1]" as No2FreqFailure,
"FreqFailure.[2]" as No3FreqFailure
FROM "/1000076497/pump" WHERE RunningTime>1

数据解析完成之后,添加转发方法,把数据转发至大数据平台的数据枢纽DataHub中。在者之前也应在DataHub中创建对应的Topic,Schema如:

	nowtime               STRING,
	deviceid              BIGINT,
	devicecategory        STRING,
	lng                   DOUBLE,
	lat                   DOUBLE,
	InletPr               DOUBLE,
	OutletPr              DOUBLE,
	RunningTime           DOUBLE,
	No1FreqTemp           DOUBLE,
	No1FreqTotalKW        DOUBLE,
	No2FreqTemp           DOUBLE,
	No2FreqTotalKW        DOUBLE,
	No3FreqTemp           DOUBLE,
	No3FreqTotalKW        DOUBLE,
	No1FreqFailure        BIGINT,
	No2FreqFailure        BIGINT,
	No3FreqFailure        BIGINT

3.DataHub数据分发

DataHub起着数据枢纽的作用,可将数据分发到离线规模计算、实时流计算、多维分析等。本文数据将以订阅的方式流向流计算,只要DataHub进来数据,就会触发流计算的任务。

数据也可归档到ODPS中用于建模等离线计算和历史数据沉淀。

4.流式计算分析

在流计算中创建以下的简单任务,形如:

--------------------------
--DataHub数据源
--------------------------
CREATE STREAM TABLE device_condition (
	nowtime                   STRING,
	deviceid                  BIGINT,
	devicecategory            STRING,
	lng                       DOUBLE,
	lat                       DOUBLE,
	inletpr                   DOUBLE,
	outletpr                  DOUBLE,
	runningtime               DOUBLE,
	no1freqtemp               DOUBLE,
	no1freqtotalkw            DOUBLE,
	no2freqtemp               DOUBLE,
	no2freqtotalkw            DOUBLE,
	no3freqtemp               DOUBLE,
	no3freqtotalkw            DOUBLE,
	no1freqfailure            BIGINT,
	no2freqfailure            BIGINT,
	no3freqfailure            BIGINT
) WITH (
	type='datahub',
	endpoint='http://dh-cn-hangzhou-internal.aliyuncs.com',
	roleArn=' XXXXX ',
	projectName='XXXX',
	topic='XXXX'
);

-----------------------------------------
---MySQL result
-----------------------------------------
CREATE RESULT TABLE result_state (
	gmtdate                 STRING,
	messagecount            BIGINT,
	maxtemp                 DOUBLE,
	avgtemp                 DOUBLE,
	PRIMARY KEY (gmtdate)
) WITH (
	type='rds',
	url='jdbc:mysql://XXXXXX.mysql.rds.aliyuncs.com:3306/iot_demo',
	username='XXX',
	password='XXX',
	tableName='result_state'
);

insert into result_state
 select
   nowtime as gmtdate,
   count(deviceid) as messagecount,
   max(no1freqtemp) as maxtemp,
   sum(distinct no1freqtemp) as avgtemp
   from device_condition group by nowtime;

5.业务应用

业务部分,可以将流式计算的结果存入关系型数据库RDS/高并发读写的OTS等,或者做更多的业务分析应用,在此过程中也可以根据结果反过来控制设备,使其能动态的调整,以达到智能的效果。

6.数据可视化

联物网的数据需要实时分析并能及时从宏观层面得以呈现,可以用DataV来实现相应效果,参考教程 DataV实践 ,最后得到形如:

时间: 2024-08-02 15:32:25

阿里云数加平台对物联网数据的实时流式分析实践--设备监控应用的相关文章

基于阿里云数加平台的大数据Serverless实践

本文PPT来自班输于10月16日在2016年杭州云栖大会上发表的<基于阿里云数加平台的大数据Serverless实践>. 数加是阿里云大数据的品牌名,其旗下包含一系列的大数据产品及服务,可以为用户提供一站式的数据开发.分析.应用平台.数加提供的服务包括智能语音/图象/视频分析服务.企业级数据仓库服务,地理信息可视化服务,风险预警与管控服务等等.其在基础平台的大数据产品包括数据开发.机器学习.大数据计算.分析型数据库.流计算,在数据应用层的产品包括数据可视化DataV.推荐引擎.人脸识别等等.

Serverless理念的弄潮儿—— 阿里云数加平台助力大数据普惠

免费开通大数据服务:https://www.aliyun.com/product/odps 阿里云坚持将计算能力变成像水电煤一样的公共服务,提供给大众,而非单单而不是卖服务器给客户,这跟今日流行的Serverless 架构理念是一致的.Serverless 理念在数加平台得到了很好的体现,数加平台今天已经可以提供很多业务场景化的计算服务,比如推荐引擎,规则引擎,以及各种人工智能的服务,助力企业在DT时代更敏捷.更智能.更具洞察力.在本文中,班输从数据平台简介.大数据应用特点.数加平台Server

袋鼠云 | 基于阿里云数加平台,助力知名物流企业进行大数据应用

关于申通易物流 上海申通易物流有限公司是申通旗下的一家集电子商务.仓储.传统物流为一体的服务型公司.公司应电子商务大发展时代需求而生,拥有自主研发的易物流仓内WMS(仓库管理系统).EDI(数据接口平台)及OMS(订单处理系统)等,为品牌.商家提供电子商务仓配解决方案及专业电子商务第三方精细化仓配一体化服务,协助电商解决电子商务供应链的管理. 申通易物流依托于申通快递在快递行业的品牌影响力和全国领先的快递配送网络,以及在电子商务行业的多年服务经验,致力打造成为一个专业的电子商务服务提供商,为客户

小议阿里云&quot;数加&quot;平台对企业有何帮助?

文章讲的是小议阿里云"数加"平台对企业有何帮助,阿里云发布大数据平台"数加",让DT时代不再只是阿里巴巴鼓吹的一个概念,而是实实在在的落地了!通过数据倒推本质,意味着一个新的时代来临,而这对企业和个人开发者又意味着什么? 1月20日,阿里云在2016云栖大会上海峰会上发布了一站式大数据平台"数加",工欲善其事必先利其器,首批亮相的就有多达20款产品,覆盖数据采集.计算引擎.数据加工.数据分析.机器学习.数据应用等数据生产全链条. 阿里云大数据事业

基于阿里云数加平台,袋鼠云助力光伏发电企业进行光伏发电预测

关于固德威 江苏固德威电源科技股份有限公司(以下简称:"固德威")是一家新能源高新技术企业,公司总部位于东方水城苏州高新区,一直专注于太阳能光伏逆变器及其监控产品的研发.生产及销售.固德威产品立足中国,并已大规模销往澳大利亚.德国.英国.法国.荷兰.比利时.丹麦.希腊.土耳其.印度.马来西亚.南非.墨西哥.巴西等三十多个国家,产品被广泛应用于住宅.商用屋顶系统以及光伏电站项目,其稳定的表现和优异的性能得到用户的普遍认可.由于良好的口碑,固德威得到广大客户的认可,公司业务增长迅速. 项目

2017大数据标准化论坛发布了第一批大数据系统测试结果,阿里云数加获得了大数据系统测试证书。

2017年3月18日, 2017大数据标准化论坛在北京成功召开.本次论坛由工业和信息化部信息化和软件服务业司和国家标准化管理委员会工业标准二部指导,中国电子技术标准化研究院和全国信标委大数据标准工作组共同主办.全国信标委大数据标准工作组组长梅宏院士.工信部信软司李冠宇副司长.国家标准化管理委员会工业二部刘大山副处长.大数据标准工作组高林秘书长.工信部信软司孙文龙处长,贵州.上海.四川.宁夏等产业主管部门领导,以及全国范围内的产.学.研.用300余位代表参加了会议,围绕大数据标准化工作.大数据技术

【Best Practice】基于阿里云数加·MaxCompute及Quick BI构建网站用户画像分析

前文背景:[Best Practice]基于阿里云数加·StreamCompute快速构建网站日志实时分析大屏   开通阿里云数加产品 前提条件 为了保证整个实验的顺利开展,需要用户使用开通相关产品及服务,包括DataHub.MaxCompute.AnalyticDB.Data IDE.Quick BI.      业务场景 数据来源于网站上的HTTP访问日志数据,基于这份网站日志来实现如下分析需求: n   统计并展现网站的PV和UV,并能够按照用户的终端类型(如Android.iPad.iP

【阿里云资讯】如何在阿里云数加平台实践Serverless架构?

导读 移动互联网.物联网和大数据应用的快速发展极大地促进了人们对云计算的需求.但是让应用架构拥有良好的可伸缩性和高可用性并非易事,运维和管控庞大的基础架构更是极大的挑战.近年来,一个新的架构风格Serverless成了热门话题. What is Serverless Serverless是一种基于互联网的技术架构理念.采用FAAS(Function as a Service)架构,通过功能组合来实现应用程序逻辑.该架构能够让开发者在构建应用的过程中无需关注计算资源的获取和运维,由平台来按需分配计

阿里云数加助力东润环能开启新能源大数据时代

北京东润环能科技股份有限公司(以下简称"东润环能")是一家从事新能源电力领域的数据信息服务公司,该司开创之初,提供了新能源发电功率预测系统.电网调度管理与支持系统.新能源城市规划与咨询服务等基础性产品,并逐步打造三大新能源互联网智慧服务生态圈平台,包括新能源投资开发生态圈第一平台.绿色电力交易与智慧用能生态圈第一平台.新能源资产智慧营运生态圈第一平台,在新能源产业金融领域将打造新能源产业数据挖掘投资服务系统.此前东润环能租用虚拟机,在虚拟机上自行搭建hadoop集群用于分析当天生产的气