基于Spark的公安大数据实时运维技术实践

公安行业存在数以万计的前后端设备,前端设备包括相机、检测器及感应器,后端设备包括各级中心机房中的服务器、应用服务器、网络设备及机房动力系统,数量巨大、种类繁多的设备给公安内部运维管理带来了巨大挑战。传统通过ICMP/SNMP、Trap/Syslog等工具对设备进行诊断分析的方式已不能满足实际要求,由于公安内部运维管理的特殊性,现行通过ELK等架构的方式同样也满足不了需要。为寻求合理的方案,我们将目光转向开源架构,构建了一套适用于公安行业的实时运维管理平台。

实时运维平台整体架构

数据采集层:Logstash+Flume,负责在不同场景下收集、过滤各类前后端硬件设备输出的Snmp Trap、Syslog日志信息以及应用服务器自身产生的系统和业务日志;

数据传输层:采用高吞吐的分布式消息队列Kafka集群,保证汇聚的日志、消息的可靠传输;

数据处理层:由Spark实时Pull Kafka数据,通过Spark Streaming以及RDD操作进行数据流的处理以及逻辑分析;

数据存储层:实时数据存入MySQL中便于实时的业务应用和展示;全量数据存入ES以及HBase中便于后续的检索分析;

业务服务层:基于存储层,后续的整体业务应用涵盖了APM、网络监控、拓扑、告警、工单、CMDB等。

整体系统涉及的主要开源框架情况如下:

另外,整体环境基于JDK 8以及Scala 2.10.4。公安系统设备种类繁多,接下来将以交换机Syslog日志为例,详细介绍日志处理分析的整体流程。

图1 公安实时运维平台整体架构

Flume+Logstash日志收集

Flume是Cloudera贡献的一个分布式、可靠及高可用的海量日志采集系统,支持定制各类Source(数据源)用于数据收集,同时提供对数据的简单处理以及通过缓存写入Sink(数据接收端)的能力。

Flume中,Source、Channel及Sink的配置如下:

该配置通过syslog source配置localhost tcp 5140端口来接收网络设备发送的Syslog信息,event缓存在内存中,再通过KafkaSink将日志发送到kafka集群中名为“syslog-kafka”的topic中。

Logstash来自Elastic公司,专为收集、分析和传输各类日志、事件以及非结构化的数据所设计。它有三个主要功能:事件输入(Input)、事件过滤器(Filter)以及事件输出(Output),在后缀为.conf的配置文件中设置,本例中Syslog配置如下:

Input(输入)插件用于指定各种数据源,本例中的Logstash通过udp 514端口接收Syslog信息;

Filter(过滤器)插件虽然在本例中不需要配置,但它的功能非常强大,可以进行复杂的逻辑处理,包括正则表达式处理、编解码、k/v切分以及各种数值、时间等数据处理,具体可根据实际场景设置;

Output(输出)插件用于将处理后的事件数据发送到指定目的地,指定了Kafka的位置、topic以及压缩类型。在最后的Codec编码插件中,指定来源主机的IP地址(host)、Logstash处理的时间戳(@timestamp)作为前缀并整合原始的事件消息(message),方便在事件传输过程中判断Syslog信息来源。单条原始Syslog信息流样例如下:

147>12164: Oct 9 18:04:10.735: %LINK-3-UPDOWN: Interface GigabitEthernet0/16, changed state to down

Logstash Output插件处理后的信息流变成为:

19.1.1.12 2016-10-13T10:04:54.520Z <147>12164: Oct 9 18:04:10.735: %LINK-3-UPDOWN: Interface GigabitEthernet0/16, changed state to down

其中红色字段就是codec编码插件植入的host以及timestamp信息。处理后的Syslog信息会发送至Kafka集群中进行消息的缓存。

Kafka日志缓冲

Kafka是一个高吞吐的分布式消息队列,也是一个订阅/发布系统。Kafka集群中每个节点都有一个被称为broker的实例,负责缓存数据。Kafka有两类客户端,Producer(消息生产者的)和Consumer(消息消费者)。Kafka中不同业务系统的消息可通过topic进行区分,每个消息都会被分区,用以分担消息读写负载,每个分区又可以有多个副本来防止数据丢失。消费者在具体消费某个topic消息时,指定起始偏移量。Kafka通过Zero-Copy、Exactly Once等技术语义保证了消息传输的实时、高效、可靠以及容错性。

Kafka集群中某个broker的配置文件server.properties的部分配置如下:

其中需指定集群里不同broker的id,此台broker的id为1,默认监听9092端口,然后配置Zookeeper(后续简称zk)集群,再启动broker即可。

Kafka集群名为syslog-kafka的topic:

Kafka集群的topic以及partition等信息也可以通过登录zk来观察。然后再通过下列命令查看Kafka接收到的所有交换机日志信息:

部分日志样例如下:

Spark日志处理逻辑

Spark是一个为大规模数据处理而生的快速、通用的引擎,在速度、效率及通用性上表现极为优异。

在Spark主程序中,通过Scala的正则表达式解析Kafka Source中名为“syslog-kafka” 的topic中的所有Syslog信息,再将解析后的有效字段封装为结果对象,最后通过MyBatis近实时地写入MySQL中,供前端应用进行实时地可视化展示。另外,全量数据存储进入HBase及ES中,为后续海量日志的检索分析及其它更高级的应用提供支持。主程序示例代码如下:

整体的处理分析主要分为4步:

初始化SparkContext并指定Application的参数;

创建基于Kafka topic “syslog-kafka” 的DirectStream;

将获取的每一行数据映射为Syslog对象,调用Service进行对象封装并返回;

遍历RDD,记录不为空时保存或者更新Syslog信息到MySQL中。

Syslog POJO的部分基本属性如下:

SwSyslog实体中的基本属性对应Syslog中的接口信息,注解中的name对应MySQL中的表sw_syslog 以及各个字段,MyBatis完成成员属性和数据库结构的ORM(对象关系映射)。

程序中的SwSyslogService有两个主要功能:

encapsulateSwSyslog()将Spark处理后的每一行Syslog通过Scala的正则表达式解析为不同的字段,然后封装并返回Syslog对象;遍历RDD分区生成的每一个Syslog对象中都有ip以及接口信息,saveSwSyslog()会据此判断该插入还是更新Syslog信息至数据库。另外,封装好的Syslog对象通过ORM工具MyBatis与MySQL进行互操作。

获取到的每一行Syslog信息如之前所述:

这段信息需解析为设备ip、服务器时间、信息序号、设备时间、Syslog类型、属性、设备接口、接口状态等字段。Scala正则解析逻辑如下:

通过正则过滤、Syslog封装以及MyBatis持久层映射,Syslog接口状态信息最终解析如下:

最后,诸如APM、网络监控或者告警等业务应用便可以基于MySQL做可视化展示。

总结

本文首先对公安运维管理现状做了简要介绍,然后介绍公安实时运维平台的整体架构,再以交换机Syslog信息为例,详细介绍如何使用Flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析,对处理过程中大量的技术细节进行了描述并通过代码详细地介绍整体处理步骤。本文中的示例实时地将数据写入MySQL存在一定的性能瓶颈,后期会对包含本例的相关代码重构,数据将会实时写入HBase来提高性能。

本文作者:佚名

来源:51CTO

时间: 2024-10-24 17:52:25

基于Spark的公安大数据实时运维技术实践的相关文章

北京青苔数据助力山东财经大学构建大数据商务分析实验室,全面提升高校大数据实训和科研能力

北京青苔数据科技有限公司通过使用阿里云大数据实验室和青苔大数据实验室产品协助山东财经大学管理科学和工程学院建立了--大数据商务分析实验室.通过这个实验室,山东财经大学管理科学和工程学院能够利用大数据.人工智能和云计算等技术以及行业案例,实现对本科和研究生学生的实训教学,帮助老师和研究生学生进行各种大数据科研创新. 客户档案 山东财经大学是财政部.教育部.山东省共建高校,坐落于名泉喷涌的国家历史文化名城--济南,是一所办学历史悠久.办学规模较大.办学特色鲜明,以经济学和管理学科为主,兼有文学.法学

基于RFID的公安、交通流动巡逻解决方案

随着社会的发展,中国的国民经济实力不断增强,各种现代化的产品正在成为国民的消费新宠,汽车已经越来越多的进入普通市民的家庭,由于各种历史的原因,车辆在我国大量出现后,一系列的管理问题摆在了政府管理者的面前:车辆使用假牌证,脱逃各种管理费用,非法变更车辆的用途--等等,在落后的管理手段面前,我们的管理者一时束手无策,为此我们研发出一套基于http://www.aliyun.com/zixun/aggregation/16929.html">RFID的公安.交通流动巡逻解决方案. 一.RFID介

基于Spark on Yarn 的淘宝数据挖掘平台

基于Spark on Yarn 的淘宝数据挖掘平台 淘宝技术部--数据挖掘与计算 为什么选择Spark On Yarn Spark On Yarn的原理和框架 淘宝在Spark On Yarn上做的工作 基于Spark on Yarn 的淘宝数据挖掘平台

(课程)基于Spark的机器学习经验

Hi,大家好!我是祝威廉,本来微博也想叫祝威廉的,可惜被人占了,于是改名叫·祝威廉二世.然后总感觉哪里不对.目前在乐视云数据部门里从事实时计算,数据平台.搜索和推荐等多个方向.曾从事基础框架,搜索研发四年,大数据平台架构.推荐三年多,个人时间现专注于集群自动化部署,服务管理,资源自动化调度等方向. 今天会和大家分享三个主题. 不过限于时间,第三个只是会简单提及下, 等未来有机会可以更详细的分享. 如何基于Spark做机器学习(Spark-Shell其实也算的上即席查询了) 基于Spark做新词发

基于Spark 的抄袭检测云计算框架研究

基于Spark 的抄袭检测云计算框架研究 于海浩 抄袭检测从根本上说是一个文本相似度的计算问题,需要迅速准确的在海量文集中对文本的原创性进行检测,耗费大量时间和资源,是计算密集和数据密集的复杂过程.采用分布式计算是是提高检测效率的有有效手段之一.本文提出了一套基于Spark的分布式抄袭检测云计算框架该框架使用由集群资源管理器Apache Mesos,支持内存驻留的 MapReduce计算框架,分布式 Hadooop 文件系统构成的分布式计算集群.测试结果表明,此框架比Hadooop传统分布式计算

如何基于Spark Streaming构建实时计算平台

1.前言 随着互联网技术的迅速发展,用户对于数据处理的时效性.准确性与稳定性要求越来越高,如何构建一个稳定易用并提供齐备的监控与预警功能的实时计算平台也成了很多公司一个很大的挑战. 自2015年携程实时计算平台搭建以来,经过两年多不断的技术演进,目前实时集群规模已达上百台,平台涵盖各个SBU与公共部门数百个实时应用,全年JStorm集群稳定性达到100%.目前实时平台主要基于JStorm与Spark Streaming构建而成,相信关注携程实时平台的朋友在去年已经看到一篇关于携程实时平台的分享:

大数据实训室助力国家高校人才梯队建设

5天,60小时,不间断的持续学习,是什么内容使得已经工作多年的教师还能如此热情高涨的学习?答案就是:新华三大数据教师培训. 2016年11月21~25日,新华三集团在杭州总部举办"2016新华三首期大数据教师培训",吸引了来自国内多所高校计算机系.信息中心的30余位老师参加.培训中,来自新华三集团的研发专家结合自主开发的大数据理论及实验教材,详细讲解了大数据发展趋势.技术理论,并通过模拟部分行业的应用场景,带领老师完成大数据基础组件使用.数据挖掘算法编写.大数据集群规划实践,帮助参训教

【Spark Summit East 2017】基于Spark ML和GraphFrames的大规模文本分析管道

本讲义出自Alexey Svyatkovskiy在Spark Summit East 2017上的演讲,主要介绍了基于Spark ML和GraphFrames的大规模文本分析管道的实现,并介绍了用于的描绘直方图.计算描述性统计的跨平台的Scala数据聚合基元--Histogrammar package,并分享了非结构化数据处理.高效访问的数据存储格式以及大规模图处理等问题.

【Spark Summit EU 2016】基于Spark与Cassandra的电信产品化解决方案

本讲义出自Brij Bhushan Ravat在Spark Summit EU上的演讲,主要介绍了爱立信公司研发的基于Spark与Cassandra的电信产品化解决方案Voucher Server. Brij Bhushan Ravat从什么是产品化这个命题入手,分享了关于产品和Voucher Server 进化的观点,并对Voucher Server这款产品进行了简单介绍,并分享了Voucher Server面对的挑战与其发展进化的过程以及关于产品的运行和维护的挑战.