前言
阿里云在物联网提供整体的解决方案,包括IoT套件、大数据分析两个场景,解决了数据上云和数据分析的各种问题,如设备入网安全、数据转发、实时分析、离线分析模型等一整套链路贯通的智能方案。
本文以一个设备的监控的例子选择一个链路的实践,目的是演示联物网在阿里云的最上手的实践。
总体框架
通用的物联网解决方案,分为两个大的方面:设备数据上云、云上数据分析。大数据的部分可以通过MaxCompute建立和训练数据模型,应用用于实时数据,比如设备故障预测。
图中较为全面的抽象化了物联网的整个流程,数据从各种各样的设备如机械设备、人体穿戴设备、移动设备等接到IoT套件中,当设备支持标准的通信协议时,通过在设备端写SDK的方式,向云上传输数据,如老旧的设备甚至可以集成芯片去支持新的通信协议;阿里云支持如C/JAVA版本的MQTT及CCP协议,反过来通过云上服务器端也提供如:JAVA/Python/.NET/PHP等SDK来管理和控制设备,例如开、关灯等场景。
IoT套件的帮助页面提供更多的介绍以及SDK下载和样例。
本次实践以图中加粗红线的为主线实践整个解决方案中的最简易流程,目的整个让链路通过几个例子形象的串起来。
实现步骤
1.设备入网
本次实践中通过模拟设备在周期性的产生数据,然后生成一个数据列表,通过一定周期来读取这个列表文件来模拟周期性的设备产生数据,进而达到模拟的效果。手工生成一个JSON的数据格式,这种格式代表多数设备产生数据的格式。形如:
{ "nowtime": "2000-02-15 04:14:57", "deviceid": "15030011", "devicecategory": "WII", "location": { "lng": "107.06", "lat": "29.85" }, "pressure": { "InletPr": "62", "OutletPr": "95" }, "RunningTime": "197", "Frequency": { "No1": [ 23.809019088745117, 94.6796646118164 ], "No2": [ 23.67925262451172, 295.56805419921875 ], "No3": [ 24.824684143066406, 337.1220703125 ] }, "FreqFailure": [ 0, 0, 0 ] }
由于需要SDK程序固定周期读取一次数据(此处模拟效果,实际情况中,只需要在设备上用SDK根据设备产生数据的频率向云上写数据即可),所以这个数据文件列表里需要把每条JSON存成一行,形如:
通过阿里云官网上的IoT套件的SDK及例 实现相应的逻辑,在实现上传样例之前,需要在IoT的系统里创建相应的产品、设备、Topic(类似表,但无需要定义结构)等基础信息。其中在设备端程序需要信息如:
// 以下参数请到控制台申请 http://iot.console.aliyun.com/iot/ opts.setDeviceName("Client_Name");// 设备名称 opts.setDeviceSecret("device_sec");// 设备密钥 opts.setProductAppKey("product_key");// 产品证书key opts.setProductAppSecret("product_sec");// 产品证书私钥 opts.setAuthUrl("http://iot.channel.aliyun.com");// 认证地址
用户需要实现关键数据上传部分的逻辑,例如:
private static void testSubPub(final CcpClient s) { String topic = "/" + opts.getProductAppKey() + "/pump"; //指定接收数据的Topic InputStream inputStream = null; InputStreamReader reader = null; BufferedReader br = null; try{ File file = new File("device_source.json"); inputStream = new FileInputStream(file); reader = new InputStreamReader(inputStream); br = new BufferedReader(reader); String line = br.readLine(); //循环读取文件,以上述文件一行为一条json数据 while (line != null) { LogUtil.print("Starting pub topic!"); //日志输出 System.out.println(line); Publish msg2 = new Publish(); msg2.topic = topic; msg2.aliveSecond = 30; msg2.payload = line.getBytes(); //同步方式调用 PubAck result = s.publish(msg2); LogUtil.print("publish result=" + result); Thread.sleep(50000); LogUtil.print("50秒后 ,发送下一行数据pub请求,发布到队列=" + topic); line = br.readLine(); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (br != null) br.close(); if (reader != null) reader.close(); if (inputStream != null) inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } }
2.规则引擎
数据进入IoT套件后,由规则引擎解析数据成结构化数据,规则引擎用类似SQL的语法去解析JSO的数据,但它不是标准的SQL语法,只是更已于用户理解。解析的部分形如:
SELECT nowtime,deviceid,devicecategory, "location.lng" as lng, "location.lat" as lat,RunningTime, "pressure.InletPr" as InletPr, "pressure.OutletPr" as outletPr, "Frequency.No1.[0]" as No1FreqTemp, "Frequency.No1.[1]" as No1FreqTotalKW, "Frequency.No2.[0]" as No2FreqTemp, "Frequency.No2.[1]" as No2FreqTotalKW, "Frequency.No3.[0]" as No3FreqTemp, "Frequency.No3.[1]" as No3FreqTotalKW, "FreqFailure.[0]" as No1FreqFailure, "FreqFailure.[1]" as No2FreqFailure, "FreqFailure.[2]" as No3FreqFailure FROM "/1000076497/pump" WHERE RunningTime>1
数据解析完成之后,添加转发方法,把数据转发至大数据平台的数据枢纽DataHub中。在者之前也应在DataHub中创建对应的Topic,Schema如:
nowtime STRING, deviceid BIGINT, devicecategory STRING, lng DOUBLE, lat DOUBLE, InletPr DOUBLE, OutletPr DOUBLE, RunningTime DOUBLE, No1FreqTemp DOUBLE, No1FreqTotalKW DOUBLE, No2FreqTemp DOUBLE, No2FreqTotalKW DOUBLE, No3FreqTemp DOUBLE, No3FreqTotalKW DOUBLE, No1FreqFailure BIGINT, No2FreqFailure BIGINT, No3FreqFailure BIGINT
3.DataHub数据分发
DataHub起着数据枢纽的作用,可将数据分发到离线规模计算、实时流计算、多维分析等。本文数据将以订阅的方式流向流计算,只要DataHub进来数据,就会触发流计算的任务。
数据也可归档到ODPS中用于建模等离线计算和历史数据沉淀。
4.流式计算分析
在流计算中创建以下的简单任务,形如:-------------------------- --DataHub数据源 -------------------------- CREATE STREAM TABLE device_condition ( nowtime STRING, deviceid BIGINT, devicecategory STRING, lng DOUBLE, lat DOUBLE, inletpr DOUBLE, outletpr DOUBLE, runningtime DOUBLE, no1freqtemp DOUBLE, no1freqtotalkw DOUBLE, no2freqtemp DOUBLE, no2freqtotalkw DOUBLE, no3freqtemp DOUBLE, no3freqtotalkw DOUBLE, no1freqfailure BIGINT, no2freqfailure BIGINT, no3freqfailure BIGINT ) WITH ( type='datahub', endpoint='http://dh-cn-hangzhou-internal.aliyuncs.com', roleArn=' XXXXX ', projectName='XXXX', topic='XXXX' ); ----------------------------------------- ---MySQL result ----------------------------------------- CREATE RESULT TABLE result_state ( gmtdate STRING, messagecount BIGINT, maxtemp DOUBLE, avgtemp DOUBLE, PRIMARY KEY (gmtdate) ) WITH ( type='rds', url='jdbc:mysql://XXXXXX.mysql.rds.aliyuncs.com:3306/iot_demo', username='XXX', password='XXX', tableName='result_state' ); insert into result_state select nowtime as gmtdate, count(deviceid) as messagecount, max(no1freqtemp) as maxtemp, sum(distinct no1freqtemp) as avgtemp from device_condition group by nowtime;
5.业务应用
业务部分,可以将流式计算的结果存入关系型数据库RDS/高并发读写的OTS等,或者做更多的业务分析应用,在此过程中也可以根据结果反过来控制设备,使其能动态的调整,以达到智能的效果。
6.数据可视化
联物网的数据需要实时分析并能及时从宏观层面得以呈现,可以用DataV来实现相应效果,参考教程 DataV实践 ,最后得到形如: