Spark-Spark Streaming-广告点击的在线黑名单过滤

任务

广告点击的在线黑名单过滤
使用
nc -lk 9999
在数据发送端口输入若干数据,比如:

1375864674543 Tom
1375864674553 Spy
1375864674571 Andy
1375864688436 Cheater
1375864784240 Kelvin
1375864853892 Steven
1375864979347 John

代码

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds

object OnlineBlackListFilter {
    def main(args: Array[String]){
            /**
             * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
             * 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
             */
             // 创建SparkConf对象
             val conf = new SparkConf()
             // 设置应用程序的名称,在程序运行的监控界面可以看到名称
             conf.setAppName("OnlineBlackListFilter")
             // 此时,程序在Spark集群
             conf.setMaster("spark://Master:7077")
             val ssc = new StreamingContext(conf, Seconds(30))
             /**
              * 黑名单数据准备,实际上黑名单一般都是动态的,例如在Redis或者数据库中,
              * 黑名单的生成往往有复杂的业务逻辑,具体情况算法不同,
              * 但是在Spark Streaming进行处理的时候每次都能够访问完整的信息。
              */
             val blackList = Array(("Spy", true),("Cheater", true))
             val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)
             val adsClickStream = ssc.socketTextStream("Master", 9999)
             /**
              * 此处模拟的广告点击的每条数据的格式为:time、name
              * 此处map操作的结果是name、(time,name)的格式
              */
             val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) }
             adsClickStreamFormatted.transform(userClickRDD => {
                  // 通过leftOuterJoin操作既保留了左侧用户广告点击内容的RDD的所有内容,
                  // 又获得了相应点击内容是否在黑名单中
                  val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)
                  /**
                   * 进行filter过滤的时候,其输入元素是一个Tuple:(name,((time,name), boolean))
                   * 其中第一个元素是黑名单的名称,第二元素的第二个元素是进行leftOuterJoin的时候是否存在的值。
                   * 如果存在的话,表明当前广告点击是黑名单,需要过滤掉,否则的话是有效点击内容;
                   */
                      val validClicked = joinedBlackListRDD.filter(joinedItem => { if(joinedItem._2._2.getOrElse(false)) { false } else { true } })
                  validClicked.map(validClick => {validClick._2._1}) }).print
                  /**
                   * 计算后的有效数据一般都会写入Kafka中,下游的计费系统会从kafka中pull到有效数据进行计费
                   */
                  ssc.start()
                  ssc.awaitTermination() }
     }
}
 **注:**
 //把程序的Batch Interval设置从30秒改成300秒:
 val ssc = new StreamingContext(conf, Seconds(300))

用spark-submit运行前面生成的jar包
/usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.test.spark.sparkstreaming.Filter --master spark://Master:7077 /root/Documents/SparkApps/Filter.jar

分析

  • 5个job

Job 0:不体现业务逻辑代码,对后面计算的负载均衡的考虑

Job 0包含有Stage 0、Stage 1。
比如Stage 1,其中的Aggregated Metrics by Executor部分:

Stage在所有Executor上都存在.

  • Job 1:运行时间比较长,耗时1.5分钟

    Stage 2,Aggregated Metrics By Executor部分:

    Stage 2只在Worker上的一个Executor执行,而且执行了1.5分钟(4个worker),从业务处理的角度看,我们发送的那么一点数据,没有必要去启动一个运行1.5分钟的任务吧。那这个任务是做什么呢? 从DAG Visualization部分,就知道此Job实际就是启动了一个接收数据的Receiver:

    Receiver是通过一个Job来启动的。那肯定有一个Action来触发它
    Tasks部分:

    只有一个Worker运行此Job,是用于接收数据。
    Locality Level是PROCESS_LOCAL,原来是内存节点。所以,默认情况下,数据接收不会使用磁盘,而是直接使用内存中的数据。Spark Streaming应用程序启动后,自己会启动一些Job。默认启动了一个Job来接收数据,为后续处理做准备。一个Spark应用程序中可以启动很多Job,而这些不同的Job之间可以相互配合。
  • Job 2:看Details可以发现有我们程序的主要业务逻辑,体现在Stag 3、Stag4、Stag 5中

Stag3、Stage4的详情,2个Stage都是用4个Executor执行的,所有数据处理是在4台机器上进行的。
![这里写图片描述](http://img.blog.csdn.net/20160511121339417

Stag 5只在Worker4上,因为这个Stage有Shuffle操作。

  • Job3:有Stage 6、Stage 7、Stage 8。其中Stage 6、Stage 7被跳过

    Stage 8的Aggregated Metrics by Executor部分。可以看到,数据处理是在4台机器上进行的:
  • Job4:也体现了我们应用程序中的业务逻辑 。有Stage 9、Stage 10、Stage 11。其中Stage 9、Stage
    10被跳过

    tage 11的详情。可以看到,数据处理是在Worker2之外的其它3台机器上进行的

总结

Spark Streaming本质

park Streaming接收Kafka、Flume、HDFS和Kinesis等各种来源的实时输入数据,进行处理后,处理结果保存在HDFS、Databases等各种地方。
Spark Streaming接收这些实时输入数据流,会将它们按批次划分,然后交给Spark引擎处理,生成按照批次划分的结果流。
Spark Streaming提供了表示连续数据流的、高度抽象的被称为离散流的DStream。DStream本质上表示RDD的序列。任何对DStream的操作都会转变为对底层RDD的操作。
Spark Streaming使用数据源产生的数据流创建DStream,也可以在已有的DStream上使用一些操作来创建新的DStream。
前面的代码每300秒会接收一批数据,基于这批数据会生成RDD,进而触发Job,执行处理。DStream是一个没有边界的集合,没有大小的限制。DStream代表了时空的概念。随着时间的推移,里面不断产生RDD。锁定到时间片后,就是空间的操作,也就是对本时间片的对应批次的数据的处理。

Spark Streaming程序转换为Spark执行的作业的过程中,使用了DStreamGraph,Spark Streaming程序中一般会有若干个对DStream的操作。DStreamGraph就是由这些操作的依赖关系构成。
从程序到DStreamGraph的转换:

从每个foreach开始,都会进行回溯。从后往前回溯这些操作之间的依赖关系,也就形成了DStreamGraph。
执行从DStream到RDD的转换,也就形成了RDD Graph:

空间维度确定之后,随着时间不断推进,会不断实例化RDD Graph,然后触发Job去执行处理。

深入官方文档(摘抄王家书籍):


Spark Core处理的每一步都是基于RDD的,RDD之间有依赖关系。上图中的RDD的DAG显示的是有3个Action,会触发3个job,RDD自下向上依 赖,RDD产生job就会具体的执行。从DSteam Graph中可以看到,DStream的逻辑与RDD基本一致,它就是在RDD的基础上加上了时间的依赖。RDD的DAG又可以叫空间维度,也就是说整个 Spark Streaming多了一个时间维度,也可以成为时空维度。
从这个角度来讲,可以将Spark Streaming放在坐标系中。其中Y轴就是对RDD的操作,RDD的依赖关系构成了整个job的逻辑,而X轴就是时间。随着时间的流逝,固定的时间间 隔(Batch Interval)就会生成一个job实例,进而在集群中运行。

对于Spark Streaming来说,当不同的数据来源的数据流进来的时候,基于固定的时间间隔,会形成一系列固定不变的数据集或event集合(例如来自flume 和kafka)。而这正好与RDD基于固定的数据集不谋而合,事实上,由DStream基于固定的时间间隔行程的RDD Graph正是基于某一个batch的数据集的。
从上图中可以看出,在每一个batch上,空间维度的RDD依赖关系都是一样 的,不同的是这个五个batch流入的数据规模和内容不一样,所以说生成的是不同的RDD依赖关系的实例,所以说RDD的Graph脱胎于DStream 的Graph,也就是说DStream就是RDD的模版,不同的时间间隔,生成不同的RDD Graph实例。
从Spark Streaming本身出发:
1.需要RDD DAG的生成模版:DStream Graph
2需要基于Timeline的job控制器
3需要inputStreamings和outputStreamings,代表数据的输入和输出
4具体的job运行在Spark Cluster之上,由于streaming不管集群是否可以消化掉,此时系统容错就至关重要
5事务处理,我们希望流进来的数据一定会被处理,而且只处理一次。在处理出现崩溃的情况下如何保证Exactly once的事务语意

从这里可以看出,DStream就是Spark Streaming的核心,就想Spark Core的核心是RDD,它也有dependency和compute。更为关键的是下面的代码:

这是一个HashMap,以时间为key,以RDD为value,这也正应证了随着时间流逝,不断的生成RDD,产生依赖关系的job,并通过jobScheduler在集群上运行。再次验证了DStream就是RDD的模版。
DStream可以说是逻辑级别的,RDD就是物理级别的,DStream所表达的最终都是通过RDD的转化实现的。前者是更高级别的抽象,后者是底层的实现。DStream实际上就是在时间维度上对RDD集合的封装,DStream与RDD的关系就是随着时间流逝不断的产生RDD,对DStream的操作就是在固定时间上操作RDD。
总结:
在 空间维度上的业务逻辑作用于DStream,随着时间的流逝,每个Batch Interval形成了具体的数据集,产生了RDD,对RDD进行transform操作,进而形成了RDD的依赖关系RDD DAG,形成job。然后jobScheduler根据时间调度,基于RDD的依赖关系,把作业发布到Spark Cluster上去运行,不断的产生Spark作业。

时间: 2024-10-22 03:53:13

Spark-Spark Streaming-广告点击的在线黑名单过滤的相关文章

用AJAX跟踪Google Adsense广告点击

adsense|ajax|google|广告 功能可以完整详细地获得每一个用户点击广告的数据.包括点击时间,用户的IP,页面来源地址,被点击的广告网站地址,如果你的站点上保存了用户cookie的话,甚至可以查询到是哪位用户点了你的广告用纯客户端javascript代码和AJAX技术实现点击监听和向服务端发送点击数据,服务端我用的是ASP脚本,只是用来保存点击数据和提供浏览器端的点击查询,可以更换为其他服务端脚本,如PHP,JSP等 数据保存方式为了简单,我用纯text文本保存,一行保存一条数据,

研究公司称互联网广告点击诈骗非常猖獗

据诈骗侦察专业公司Fair Isaac发表的一篇研究报告称,Google和雅虎以及其它在线营销载体发布的互联网广告链接的欺骗性点击率可能远远超出网络运营商所了解的状况. 据redding.com网站报道,这种欺骗性行为包括自动的计算机程序或者诈骗分子没有购买意图而反复点击广告的链接.每一次点击在搜索结果和数千个网站上内容旁边出现的简短的广告都会产生佣金.这种金融公式是产生这种诈骗行为的原因之一. Fair Isaac公司5月18日在旧金山举行的公司会议上将讨论这篇研究报告的初步结论.这个结论将重

业余草推荐阿里妈妈自研广告点击率预估核心算法MLR

业余草推荐阿里妈妈自研广告点击率预估核心算法MLR. 小编觉得CTR(广告点击率)预估的能力对于广告系统的意义和重要性,类似于在证券市场上预测股价的能力,优秀的CTR预测,通向美好和财富...(以下转载内容部分较为干货,文科生不易看懂是正常的,静静地欣赏数学之美即可...) 阿里妈妈国内领先的大数据营销平台,拥有阿里巴巴集团核心商业数据.在这里每天有超过50亿的推广流量完成超过3亿件商品的推广展现,覆盖高达98%的网民,实现数字媒体的一站式触达.在这些鲜亮数字背后,是什么样的核心算法在起作用?如

微软找出广告点击欺诈正体 起诉索赔75万美元

中介交易 http://www.aliyun.com/zixun/aggregation/6858.html">SEO诊断 淘宝客 云主机 技术大厅 历时1年以上的调查后,微软已准备好对广告点击欺诈(Click Fraud)团体提起第一次诉讼. 据美国<纽约时报>报道,微软对三位加拿大居民EricLam.GordonLam和Melanie Suen,以及这类曾进行点击欺诈的团体等,一并于周一(22日)在美国西雅图地方法院提起民事诉讼.微软要求法院判决75万美元以上的损害赔偿.

Google改进广告点击质量增加营收

中介交易 SEO诊断 淘宝客 云主机 技术大厅 网络搜索巨头Google正极力提升其广告点击的质量,来打消投资者对其广告营收可能会不断下滑的担忧.对Google来说,搜索广告点击次数减少的同时营收会增加吗?这是一个投资者.分析人士以及Google自己努力要回答的一个问题. 3月26日,市场研究公司comScore发表的一份最新研究报告显示,Google上月搜索广告的点击次数出现了下降.据comScore透露,Google二月份的广告点击次数较前一个月下降了3%,与去年同期相比也仅增加了3%.一些

关于广告点击率的种种争论

性.宗教和政治.Mac对PC.总有一些话题是我们大家所共同关注的.如果你从事在线营销业务,有一个话题你可能会回避:点击是否存在问题. 自<连线>杂志于1994年http://www.aliyun.com/zixun/aggregation/2952.html">10月发布第一则旗帜广告以来,人们对点击的争论就没有休止过.点击是在线媒体的美好承诺.传统的广告只能通过抽样.研究和其它的问答方式评估,而在线广告则可以计量.有人曾说过,"我一半的广告浪费了,但我不知道是哪一半

从Google Analytics分析AdSense的广告点击行为

Google自从发布了Google分析(Google Analytics)中的AdSense工具后,只要将将AdSense帐号和Analytics帐号绑定,就可以非常方便了AdSense用户查看自己的统计分析数据,是一项非常实用的功能,可以帮助广大的广告用户分析AdSense的广告点击. 其中的"热门AdSense推介"数据统计,可以分析出从外部网站搜索或者跳转过来的用户进行点击所产生的收入,对于优化AdSense和提高AdSense收入具有指导性意义,并且还可以分析出中国不同类型用户

AdSense广告点击格式修改

据AdSense的官方博客报道,Google AdSense目前已经修改了内容广告单元的点击格式,原先的格式是鼠标放在整个区域都可以点击,新的格式是鼠标只放在超级链接的地方才能点击. 据Google声称,这么做的原因是为了改善广告用户体验,减少误点的次数,保证AdWords用户只对有效点击付费,通过减少偶然中的无疑点击,以增加广告联盟的价值和满意度,鼓励发布者更多的消费. 从这些天我的AdSense数据来看,AdSense点击率似乎有一点点下降,但是eCPM看上去似乎也没有得到提高,反而得到了降

Adsense广告点击追踪软件

以前写<Google Adsense常用技巧总结>,曾经提过"广告点击追踪软件",用来跟踪哪些用户从哪个IP地址点击了哪个广告,同时可以对于恶意点击的IP进行追查,今天我就将我正在使用的这个"追踪软件"的修改版发布一下. 这个软件是一个第三方软件,原先地址已经失效,代码本身我也进行了一些修改,之所以要修改,因为原先程序在权限控制上有严重缺陷,即使我这里进行了一些简单的修改,可能权限控制还是有一些问题.大家如果有时间可以再来修改一下这个代码. 原先的问题在