今天主要是以一个数据分析者的角度来与大家分享如何使用spark进行大数据分析。
我将分以下4部分为大家进行介绍。首先介绍spark的相关背景,包括基本概念以及spark与hadoop的关系。接下来介绍如何使用spark RDD进行数据分析。之后分享spark与大数据分析的关系,以及spark在大数据分析中所起到的作用。最后,为大家分享一下我与四位小伙伴基于去年的SODA开放的交通数据做的案例:大型活动大规模人群的检测与疏散。
spark是一个快速易用的大规模数据计算框架,具有速度快、易使用、功能全的特点,并且可以与Hadoop很好地集成。
那么我们什么时候需要使用spark呢?首先,当我们需要处理的数据量超过了单机尺度(比如我们的计算机有4GB的内存,而我们需要处理100GB以上的数据)这时我们可以选择spark集群进行计算。有时我们可能需要处理的数据量并不大,但是计算很复杂,需要大量的时间,这时我们也可以选择利用spark集群强大的计算资源,并行化地计算。
spark可以提供了丰富的数据处理操作,包括在线的流式数据处理、离线的批量数据处理、即席查询、机器学习。
spark也提供了多种编程API接口,供具有不同开发经验的数据分析者使用。
spark与Hadoop是什么关系呢? Hadoop有两个核心模块,分布式存储模块HDFS和分布式计算模块Mapreduce。spark本身并没有提供分布式文件系统,因此spark的分析大多依赖于Hadoop的分布式文件系统HDFS。另一方面,Hadoop的Mapreduce与spark都可以进行数据计算,而相比于Mapreduce,spark的速度更快并且提供的功能更加丰富。
下面来介绍如何使用spark RDD进行编程。
首先介绍一下spark RDD,spark建立在统一抽象的RDD之上,RDD指的是一个只读的可分区的分布式数据集。可以将它的全部或部分缓存在内存中,供多次计算重用。而且RDD提供了多种友好的操作函数供数据分析者做数据处理。
spark为什么会在迭代计算中比hadoop快很多呢?Hadoop进行迭代数据处理时,需要把数据从HDFS中读出,分析,写回到HDFS中,再读出、分析、写回。在此过程中进行了大量的磁盘I/O操作,消耗了大量的时间。而spark可以将数据一次性地从HDFS读到内存中,并进行多次计算,因而减少了大量的开销。
通过spark RDD进行编程可以理解为利用RDD提供的算子、结合实际需求,设计一个数据处理的pipeline,将原始数据转换成我们需要得到的数据。RDD算子分为transformation和action,transformation是得到一个新的RDD,并且不会执行计算,直到遇到action算子的时候计算才会被触发。
这是一些常用的spark RDD算子。
下面来介绍如何使用spark RDD进行数据处理。总结起来可以分为以下三步:1.根据我们的目标定义好输入和输出数据的格式,并比较两者之间的差异;2.明确输入输出后我们根据RDD本身提供的算子以及自己定义的函数来设计pipeline;3.选择一种API编程实现。
我们以词频统计为例进行说明。我们希望对一段非结构化文本做词频统计,即统计一段文本中每个单词出现的次数,并将单词按照字母ASCII顺序升序排列。首先定义好我们的输入与输出数据格式,输入数据是一段介绍spark的文本,输出是逗号分隔的词频统计。
第二步设计算子pipeline,首先将数据从HDFS中读取,通过flatMap算子、map算子和reduceByKey算子统计出每个单词出现的频次,通过sortByKey算子将单词升序排列,再通过一个map算子转化成我们需要的目标格式,最后通过save算子将处理好的结果写回到HDFS中。
这是我们进行词频统计任务中使用的算子,包括4个transformation算子和一个action算子。
第三步我们来进行编程实现,在这里我们选择python进行编程。我们看到原本很复杂的pipeline,spark只需要短短的几行代码就可以实现,可见spark的强大功能以及对数据分析者提供的友好接口。
下面和大家介绍spark与大数据分析的关系。
数据分析一般需要进行两次创造。首先是第一次创造,即从整体上进行产品设计,找到一个好的应用问题,并思考问题是否有意义,数据源是否可靠,现有数据源可以解决该问题吗,是否需要其他数据源。在整体设计完成之后我们进行第二次创造,即在细节上通过技术实现,这个过程是一个不断迭代往复的过程。总结起来,数据分析,首先要找到正确的问题,然后再正确地分析数据。当然两者并非完全独立,比如对数据的基本统计往往会帮助我们不断深入地理解数据,进而发现问题。
下面介绍数据流与应用问题之间的关系,以及不同的数据分析工具在其中所起到的作用。在明确了应用问题,选择好了数据源之后,我们首先将原始数据转化为中间数据。原始数据往往量巨大(几百GB、TB级别),并且多是未经清洗的非结构化数据,因此我们需要用HDFS进行存储,使用大数据分析工具spark进行清洗压缩编码,得到结构化的中间数据,我们以后大部分的分析都可以基于中间数据进行。中间数据往往会比原始数据量小(几十GB),但单机仍然难以处理,因此也需要存储到HDFS中,使用spark/Hive进行进一步的处理,得到小数据。小数据大多是一些统计结果、提取的特征等等,数据量也相对较小(几MB至几GB),我们可以通过python、R语言等工具在单机上进行建模、分析,并将分析结果进行可视化,可以选择R语言、python绘制静态的统计图,也可以选择echarts、D3等工具进行交互展示。通过这些可视化的结果发现insight进而解决实际问题。
在大数据快速发展的今天,有多种多样的大数据分析工具应运而生,我们为什么要选择spark作为我们的大数据分析工具?相比于其他分析工具,spark具有哪些优势?ETL、机器学习、即席查询是大数据分析中非常重要的操作。已经有了一些大数据工具为此提供了解决方案,例如hadoop mapreduce解决大数据ETL、mahout解决大数据机器学习、hive解决大数据即席查询。然而这给数据分析者带来了不便,对于每一种大数据操作,都要学习一种新的技术,这带来了很大的学习成本。
那么我们会设想,会不会有一种工具,将常用的大数据分析功能统一起来呢?
spark经过近年来的飞速发展,已经做到“one stack to rule them all”,通过RDD将三者统一在了一起。数据分析者可以通过spark core大数据ETL,通过spark Mllib进行大数据机器学习,通过spark SQL进行大数据即席查询。
因此,数据分析者只需掌握spark一种工具,即可实现绝大多数的大数据分析功能。
最后,我来与大家分享一下我与其他4位小伙伴(上海交通大学的张宏伦、李铎、杨皓天,同济大学的金建栋)使用去年SODA的开放交通数据进行案例分析的一些结果:大型活动大规模人群的检测与疏散。
上海经常会举办大型活动,例如大型演唱会、足球赛等。这些大型活动会聚集大量的人群,有时会因为人数过多产生安全隐患,例如2015年新年上海外滩的踩踏事件。这些活动举办的时间地点不固定,也难以得知全部活动的信息,如果活动临时更改时间地点,也难以实时得到新的信息。
这给政府带来了公共安全的隐患。对于参加活动的人,在活动结束时,往往地铁已经停运,面对黑车的漫天要价,会面临回家难的问题。而现在市场上已经出现了一些专用巴士公司,他们希望寻找更多的客源创造更多的利润。然而三者之间联系脆弱、信息孤立。
我们希望以开放数据为基础,利用spark大数据分析技术,使用算法模型,通过交通数据识别出大型活动并提供疏散建议。为政府解决社会问题,为活动参加者解决回家难的问题,同时为专用巴士公司提供更多客源,创造更多利润。
我们选取了公交卡刷卡数据、出租车运行数据、地铁运行数据以及浦东公交车实时数据、气象数据。其中使用最多的是一卡通乘客刷卡数据,包含了2015年4月上海市的所有公交卡刷卡记录,涵盖用户1000万以上,交易记录2亿4千万条以上。
如此多的数据量单机难以处理,因此我们选择spark作为数据处理工具。这是我们的整体架构,首先根据我们的目标进行数据集扩充,包括从非常票务网、大麦网等票务网站爬取的各大活动的信息。之后进行数据预处理工作,包括数据去噪、数据融合等。之后进行数据分析挖掘,包括时序分析、空间挖掘、个体行为建模等,并将分析结果可视化。
我们对多种交通工具的每天出行时间分布进行了统计,可以看到地铁和公交车有着明显的早晚高峰,而出租车除了午夜时间一天的乘客数量较为平均。
我们对每天的交通总流量进行了分析,发现交通流量稳定,并以周为单位呈周期规律,而工作日的总流量要高于休息日。观察一周的总流量,周一到周四的交通流量基本相同,周五流量要略高于周一至周四,而周六流量要低于工作日,周日的流量为一周最低。
在分析完每天的交通总流量之后,我们分析了一天中各个时段的流量。选取了周一到周四工作日中的2天(一个晴天、一个雨天),工作日周五和工作日周六。我们发现两个周一到周四工作日的流量曲线几乎重合,因此我们可以推测,周一到周四的工作日不仅总流量稳定,而且各个时段的交通总流量稳定,且早晚高峰显著。而观察周五的流量,我们发现在大约10:00之前,流量曲线几乎与周一到周四的流量重合,而10:00以后几乎每个时段流量都会比平时高出一些,这解释了为什么周五的总流量会高于周一到周四。而周六的流量没有早晚高峰,但在空闲时段(如中午)流量要高于工作日。
我们分析了一个月以来地铁乘客的公交卡刷卡次数分布。上海乘坐一次地铁,进出需要刷卡2次,因此正常情况下,乘客的刷卡次数一定是偶数。从分布图中我们也可以观察到这一点,然而我们也发现也有一些乘客的刷卡次数呈奇数,这可能是设备故障或乘客逃票行为导致。另外,一个月来乘客的交易次数呈重尾分布,而且一个月中出行2次的乘客最多。
在分析了宏观上的流量之后,我们来分析个体的行为。我们用模序(motif)来对个体的行为进行抽象,即用有向图表示用户一天的轨迹。比如第二幅图中,乘客一天中先从站点1出发去2,再从站点2出发返回1,这是典型的通勤行为。我们发现乘客绝大多数的行为可以使用以上10种模序描述,因此绝大多数的乘客行为是规律的。我们也关注模序的变化,因为模序的变化暗示着行为的异常。比如某天大量用户的模序发生变化且都去一个共同的地点,那么他们很可能去参加同一场大型活动。
下面我们研究大型活动与交通流量的关系。这是中华艺术宫地铁站几天的客流量。平时情况下,客流量较少。4.18号晚中华艺术宫附近场馆举办了一场演唱会,可以看到这一天在活动开始前与结束后客流量大大增加,远高于平时,且出现了两个尖峰。因此大型活动确实对交通流量造成了较为显著的影响,我们通过交通数据来识别大型活动是可行的。
下面是我们使用spark技术,通过模型做出的大型活动识别结果,做图颜色表示地铁,例如蓝色代表8号线,小长方形表示地铁站点。右图表示一个月中哪一天算法检测出了大型活动,白色表示没有检测到,红色表示检测到。右侧两条曲线分别表示当日的客流量与历史平均的客流量。
最后,我们基于虹口足球场4月11日晚(一场足球赛)的交通数据进行了控制性模拟实验。我们发现,在未采取控制前,需要历史很长时间才可以完成疏散,而当使用巴士协助疏散之后,所花时间大大减少,这也降低了风险。同时,我们发现调配巴士数量越多、载客量越大疏散越快,但也有可能造成巴士资源浪费、造成损失,因此存在使得盈利最大和疏散最快的最优点,可以通过最优化模型得到。至此,我们以开放数据为基础,利用spark大数据技术和算法模型,对乘客解决了活动结束回家难问题,对专用巴士提供了更多客源增加其收益,同时帮助政府减少了公共安全风险。
最后从一个数据分析者的角度,总结一下个人对数据分析的理解。我们首先要根据实际需求找到应用问题,数据是我们的研究基础,spark/hadoop等技术是我们的分析工具,算法模型是我们的理论方法,而数据可视化是一种呈现信息的手段。
====================================分割线================================
本文转自d1net(转载)