任务
广告点击的在线黑名单过滤
使用 nc -lk 9999
在数据发送端口输入若干数据,比如:
1375864674543 Tom
1375864674553 Spy
1375864674571 Andy
1375864688436 Cheater
1375864784240 Kelvin
1375864853892 Steven
1375864979347 John
代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
object OnlineBlackListFilter {
def main(args: Array[String]){
/**
* 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
* 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
*/
// 创建SparkConf对象
val conf = new SparkConf()
// 设置应用程序的名称,在程序运行的监控界面可以看到名称
conf.setAppName("OnlineBlackListFilter")
// 此时,程序在Spark集群
conf.setMaster("spark://Master:7077")
val ssc = new StreamingContext(conf, Seconds(30))
/**
* 黑名单数据准备,实际上黑名单一般都是动态的,例如在Redis或者数据库中,
* 黑名单的生成往往有复杂的业务逻辑,具体情况算法不同,
* 但是在Spark Streaming进行处理的时候每次都能够访问完整的信息。
*/
val blackList = Array(("Spy", true),("Cheater", true))
val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)
val adsClickStream = ssc.socketTextStream("Master", 9999)
/**
* 此处模拟的广告点击的每条数据的格式为:time、name
* 此处map操作的结果是name、(time,name)的格式
*/
val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) }
adsClickStreamFormatted.transform(userClickRDD => {
// 通过leftOuterJoin操作既保留了左侧用户广告点击内容的RDD的所有内容,
// 又获得了相应点击内容是否在黑名单中
val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)
/**
* 进行filter过滤的时候,其输入元素是一个Tuple:(name,((time,name), boolean))
* 其中第一个元素是黑名单的名称,第二元素的第二个元素是进行leftOuterJoin的时候是否存在的值。
* 如果存在的话,表明当前广告点击是黑名单,需要过滤掉,否则的话是有效点击内容;
*/
val validClicked = joinedBlackListRDD.filter(joinedItem => { if(joinedItem._2._2.getOrElse(false)) { false } else { true } })
validClicked.map(validClick => {validClick._2._1}) }).print
/**
* 计算后的有效数据一般都会写入Kafka中,下游的计费系统会从kafka中pull到有效数据进行计费
*/
ssc.start()
ssc.awaitTermination() }
}
}
**注:**
//把程序的Batch Interval设置从30秒改成300秒:
val ssc = new StreamingContext(conf, Seconds(300))
用spark-submit运行前面生成的jar包
/usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.test.spark.sparkstreaming.Filter --master spark://Master:7077 /root/Documents/SparkApps/Filter.jar
分析
- 5个job
Job 0:不体现业务逻辑代码,对后面计算的负载均衡的考虑
Job 0包含有Stage 0、Stage 1。
比如Stage 1,其中的Aggregated Metrics by Executor部分:
Stage在所有Executor上都存在.
- Job 1:运行时间比较长,耗时1.5分钟
Stage 2,Aggregated Metrics By Executor部分:
Stage 2只在Worker上的一个Executor执行,而且执行了1.5分钟(4个worker),从业务处理的角度看,我们发送的那么一点数据,没有必要去启动一个运行1.5分钟的任务吧。那这个任务是做什么呢? 从DAG Visualization部分,就知道此Job实际就是启动了一个接收数据的Receiver:
Receiver是通过一个Job来启动的。那肯定有一个Action来触发它
Tasks部分:
只有一个Worker运行此Job,是用于接收数据。
Locality Level是PROCESS_LOCAL,原来是内存节点。所以,默认情况下,数据接收不会使用磁盘,而是直接使用内存中的数据。Spark Streaming应用程序启动后,自己会启动一些Job。默认启动了一个Job来接收数据,为后续处理做准备。一个Spark应用程序中可以启动很多Job,而这些不同的Job之间可以相互配合。 - Job 2:看Details可以发现有我们程序的主要业务逻辑,体现在Stag 3、Stag4、Stag 5中
Stag3、Stage4的详情,2个Stage都是用4个Executor执行的,所有数据处理是在4台机器上进行的。
![这里写图片描述](http://img.blog.csdn.net/20160511121339417
Stag 5只在Worker4上,因为这个Stage有Shuffle操作。
- Job3:有Stage 6、Stage 7、Stage 8。其中Stage 6、Stage 7被跳过
Stage 8的Aggregated Metrics by Executor部分。可以看到,数据处理是在4台机器上进行的: - Job4:也体现了我们应用程序中的业务逻辑 。有Stage 9、Stage 10、Stage 11。其中Stage 9、Stage
10被跳过
tage 11的详情。可以看到,数据处理是在Worker2之外的其它3台机器上进行的
总结
Spark Streaming本质
park Streaming接收Kafka、Flume、HDFS和Kinesis等各种来源的实时输入数据,进行处理后,处理结果保存在HDFS、Databases等各种地方。
Spark Streaming接收这些实时输入数据流,会将它们按批次划分,然后交给Spark引擎处理,生成按照批次划分的结果流。
Spark Streaming提供了表示连续数据流的、高度抽象的被称为离散流的DStream。DStream本质上表示RDD的序列。任何对DStream的操作都会转变为对底层RDD的操作。
Spark Streaming使用数据源产生的数据流创建DStream,也可以在已有的DStream上使用一些操作来创建新的DStream。
前面的代码每300秒会接收一批数据,基于这批数据会生成RDD,进而触发Job,执行处理。DStream是一个没有边界的集合,没有大小的限制。DStream代表了时空的概念。随着时间的推移,里面不断产生RDD。锁定到时间片后,就是空间的操作,也就是对本时间片的对应批次的数据的处理。
Spark Streaming程序转换为Spark执行的作业的过程中,使用了DStreamGraph,Spark Streaming程序中一般会有若干个对DStream的操作。DStreamGraph就是由这些操作的依赖关系构成。
从程序到DStreamGraph的转换:
从每个foreach开始,都会进行回溯。从后往前回溯这些操作之间的依赖关系,也就形成了DStreamGraph。
执行从DStream到RDD的转换,也就形成了RDD Graph:
空间维度确定之后,随着时间不断推进,会不断实例化RDD Graph,然后触发Job去执行处理。
深入官方文档(摘抄王家书籍):
Spark Core处理的每一步都是基于RDD的,RDD之间有依赖关系。上图中的RDD的DAG显示的是有3个Action,会触发3个job,RDD自下向上依 赖,RDD产生job就会具体的执行。从DSteam Graph中可以看到,DStream的逻辑与RDD基本一致,它就是在RDD的基础上加上了时间的依赖。RDD的DAG又可以叫空间维度,也就是说整个 Spark Streaming多了一个时间维度,也可以成为时空维度。
从这个角度来讲,可以将Spark Streaming放在坐标系中。其中Y轴就是对RDD的操作,RDD的依赖关系构成了整个job的逻辑,而X轴就是时间。随着时间的流逝,固定的时间间 隔(Batch Interval)就会生成一个job实例,进而在集群中运行。
对于Spark Streaming来说,当不同的数据来源的数据流进来的时候,基于固定的时间间隔,会形成一系列固定不变的数据集或event集合(例如来自flume 和kafka)。而这正好与RDD基于固定的数据集不谋而合,事实上,由DStream基于固定的时间间隔行程的RDD Graph正是基于某一个batch的数据集的。
从上图中可以看出,在每一个batch上,空间维度的RDD依赖关系都是一样 的,不同的是这个五个batch流入的数据规模和内容不一样,所以说生成的是不同的RDD依赖关系的实例,所以说RDD的Graph脱胎于DStream 的Graph,也就是说DStream就是RDD的模版,不同的时间间隔,生成不同的RDD Graph实例。
从Spark Streaming本身出发:
1.需要RDD DAG的生成模版:DStream Graph
2需要基于Timeline的job控制器
3需要inputStreamings和outputStreamings,代表数据的输入和输出
4具体的job运行在Spark Cluster之上,由于streaming不管集群是否可以消化掉,此时系统容错就至关重要
5事务处理,我们希望流进来的数据一定会被处理,而且只处理一次。在处理出现崩溃的情况下如何保证Exactly once的事务语意
从这里可以看出,DStream就是Spark Streaming的核心,就想Spark Core的核心是RDD,它也有dependency和compute。更为关键的是下面的代码:
这是一个HashMap,以时间为key,以RDD为value,这也正应证了随着时间流逝,不断的生成RDD,产生依赖关系的job,并通过jobScheduler在集群上运行。再次验证了DStream就是RDD的模版。
DStream可以说是逻辑级别的,RDD就是物理级别的,DStream所表达的最终都是通过RDD的转化实现的。前者是更高级别的抽象,后者是底层的实现。DStream实际上就是在时间维度上对RDD集合的封装,DStream与RDD的关系就是随着时间流逝不断的产生RDD,对DStream的操作就是在固定时间上操作RDD。
总结:
在 空间维度上的业务逻辑作用于DStream,随着时间的流逝,每个Batch Interval形成了具体的数据集,产生了RDD,对RDD进行transform操作,进而形成了RDD的依赖关系RDD DAG,形成job。然后jobScheduler根据时间调度,基于RDD的依赖关系,把作业发布到Spark Cluster上去运行,不断的产生Spark作业。