Spark踩坑记:共享变量

前言

前面总结的几篇spark踩坑博文中,我总结了自己在使用spark过程当中踩过的一些坑和经验。我们知道Spark是多机器集群部署的,分为Driver/Master/Worker,Master负责资源调度,Worker是不同的运算节点,由Master统一调度。

而Driver是我们提交Spark程序的节点,并且所有的reduce类型的操作都会汇总到Driver节点进行整合。节点之间会将map/reduce等操作函数传递一个独立副本到每一个节点,这些变量也会复制到每台机器上,而节点之间的运算是相互独立的,变量的更新并不会传递回Driver程序。

那么有个问题,如果我们想在节点之间共享一份变量,比如一份公共的配置项,该怎么办呢?Spark为我们提供了两种特定的共享变量,来完成节点间变量的共享。 本文首先简单的介绍spark以及spark streaming中累加器和广播变量的使用方式,然后重点介绍一下如何更新广播变量。

累加器

顾名思义,累加器是一种只能通过关联操作进行“加”操作的变量,因此它能够高效的应用于并行操作中。它们能够用来实现counters和sums。Spark原生支持数值类型的累加器,开发者可以自己添加支持的类型,在2.0.0之前的版本中,通过继承AccumulatorParam来实现,而2.0.0之后的版本需要继承AccumulatorV2来实现自定义类型的累加器。

如果创建了一个具名的累加器,它可以在spark的UI中显示。这对于理解运行阶段(running stages)的过程有很重要的作用。如下图:

在2.0.0之前版本中,累加器的声明使用方式如下:


  1. scala> val accum = sc.accumulator(0, "My Accumulator") 
  2. accum: spark.Accumulator[Int] = 0 
  3.  
  4. scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) 
  5. ... 
  6. 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s 
  7.  
  8. scala> accum.value 
  9. res2: Int = 10 

累加器的声明在2.0.0发生了变化,到2.1.0也有所变化,具体可以参考官方文档,我们这里以2.1.0为例将代码贴一下:


  1. scala> val accum = sc.longAccumulator("My Accumulator") 
  2. accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0) 
  3.  
  4. scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)) 
  5.  
  6. 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s 
  7.  
  8. scala> accum.value 
  9. res2: Long = 10 

广播变量

累加器比较简单直观,如果我们需要在spark中进行一些全局统计就可以使用它。但是有时候仅仅一个累加器并不能满足我们的需求,比如数据库中一份公共配置表格,需要同步给各个节点进行查询。OK先来简单介绍下spark中的广播变量:

广播变量允许程序员缓存一个只读的变量在每台机器上面,而不是每个任务保存一份拷贝。例如,利用广播变量,我们能够以一种更有效率的方式将一个大数据量输入集合的副本分配给每个节点。Spark也尝试着利用有效的广播算法去分配广播变量,以减少通信的成本。

一个广播变量可以通过调用SparkContext.broadcast(v)方法从一个初始变量v中创建。广播变量是v的一个包装变量,它的值可以通过value方法访问,下面的代码说明了这个过程:


  1. scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) 
  2. broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) 
  3.  
  4. scala> broadcastVar.value 
  5. res0: Array[Int] = Array(1, 2, 3) 

从上文我们可以看出广播变量的声明很简单,调用broadcast就能搞定,并且scala中一切可序列化的对象都是可以进行广播的,这就给了我们很大的想象空间,可以利用广播变量将一些经常访问的大变量进行广播,而不是每个任务保存一份,这样可以减少资源上的浪费。

更新广播变量(rebroadcast)

广播变量可以用来更新一些大的配置变量,比如数据库中的一张表格,那么有这样一个问题,如果数据库当中的配置表格进行了更新,我们需要重新广播变量该怎么做呢。上文对广播变量的说明中,我们知道广播变量是只读的,也就是说广播出去的变量没法再修改,那么我们应该怎么解决这个问题呢?

答案是利用spark中的unpersist函数

Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.

上文是从spark官方文档摘抄出来的,我们可以看出,正常来说每个节点的数据是不需要我们操心的,spark会自动按照LRU规则将老数据删除,如果需要手动删除可以调用unpersist函数。

那么更新广播变量的基本思路:将老的广播变量删除(unpersist),然后重新广播一遍新的广播变量,为此简单包装了一个用于广播和更新广播变量的wraper类,如下:


  1. import java.io.{ ObjectInputStream, ObjectOutputStream } 
  2. import org.apache.spark.broadcast.Broadcast 
  3. import org.apache.spark.streaming.StreamingContext 
  4. import scala.reflect.ClassTag 
  5.  
  6. // This wrapper lets us update brodcast variables within DStreams' foreachRDD 
  7. // without running into serialization issues 
  8. case class BroadcastWrapper[T: ClassTag]( 
  9.     @transient private val ssc: StreamingContext, 
  10.     @transient private val _v: T) { 
  11.  
  12.   @transient private var v = ssc.sparkContext.broadcast(_v) 
  13.  
  14.   def update(newValue: T, blocking: Boolean = false): Unit = { 
  15.     // 删除RDD是否需要锁定 
  16.     v.unpersist(blocking) 
  17.     v = ssc.sparkContext.broadcast(newValue) 
  18.   } 
  19.  
  20.   def value: T = v.value 
  21.  
  22.   private def writeObject(out: ObjectOutputStream): Unit = { 
  23.     out.writeObject(v) 
  24.   } 
  25.  
  26.   private def readObject(in: ObjectInputStream): Unit = { 
  27.     v = in.readObject().asInstanceOf[Broadcast[T]] 
  28.   } 

利用该wrapper更新广播变量,大致的处理逻辑如下:


  1. // 定义 
  2. val yourBroadcast = BroadcastWrapper[yourType](ssc, yourValue) 
  3.  
  4. yourStream.transform(rdd => { 
  5.   //定期更新广播变量 
  6.   if (System.currentTimeMillis - someTime > Conf.updateFreq) { 
  7.     yourBroadcast.update(newValue, true) 
  8.   } 
  9.   // do something else 
  10. }) 

总结

spark中的共享变量是我们能够在全局做出一些操作,比如record总数的统计更新,一些大变量配置项的广播等等。而对于广播变量,我们也可以监控数据库中的变化,做到定时的重新广播新的数据表配置情况,另外我使用上述方式,在每天千万级的数据实时流统计中表现稳定,所以有相似问题的同学也可以进行尝试,有任何问题,欢迎随时骚扰沟通。

本文作者:肖力涛

来源:51CTO

时间: 2024-08-07 17:35:36

Spark踩坑记:共享变量的相关文章

【踩坑记】从HybridApp到ReactNative

前言 随着移动互联网的兴起,Webapp开始大行其道.大概在15年下半年的时候我接触到了HybridApp.因为当时还没毕业嘛,所以并不清楚自己未来的方向,所以就投入了HybridApp的怀抱. HybridApp最早好像是国外的PhoneGap,然后国内有AppCan.Dcloud.APICloud等等.我当时接触的是APICloud,相比于其他平台,APICloud最大的特点是它的混合程度比较高! 要知道,Webapp最大的问题就是性能问题始终无法和原生App相比,由此才发展出来Hybrid

Docker踩坑记

看最近Docker这么火,也跟着用用.结果从一个坑出来又掉另一个坑去了. FATA[0000] Error response from daemon: client and server don't have same version (client : 1.16, server: 1.15) 1 [root@aliyun ~]# docker info 2 FATA[0000] Error response from daemon: client and server don't have s

总结!一个产品新手的踩坑记

本文讲的是总结!一个产品新手的踩坑记, 加入产品大家庭正好三个月了,在大佬们面前不敢造次,于是,觉得可以拿出来分享的,便是在这段春夏交替之光里,亲脚踩的一些坑,以及在坑里获得的一些小发现.所幸的是,在老大和运营设计开发同学们的帮助下,目前还没有摔成骨折,还望日后能留全尸. 踩坑装备 没点装备怎么敢出门打怪.需求讨论会上记录好被提出的需求.其内容.提出者,为之后踩坑.填坑做好准备.对方攻击double没关系,首先血要厚.(实践证明,这些装备在日后我明确需求以及设计完成后自我检查的过程中帮了大忙).

Android Studio踩坑记

拾起Android项目,需要使用Goolgle Play Services.顺应潮流换了Android Studio,开启了踩坑之旅. 尝试直接将Eclipse项目导入AS,结果根本没法用啊.正确的方法应该是升级ADT,在Eclipse下导出build.gradle然后再导入.但是升级的时间还不如直接新建项目把资源拷进去,同时也能了解一下AS默认的项目结构. 第一个遇到的问题是新建的项目没有assert和lib目录.java和res等资源都在src/main目录下,于是我将assets和libs

Spark SQL 用户自定义函数UDF、用户自定义聚合函数UDAF 教程(Java踩坑教学版)

在Spark中,也支持Hive中的自定义函数.自定义函数大致可以分为三种: UDF(User-Defined-Function),即最基本的自定义函数,类似to_char,to_date等 UDAF(User- Defined Aggregation Funcation),用户自定义聚合函数,类似在group by之后使用的sum,avg等 UDTF(User-Defined Table-Generating Functions),用户自定义生成函数,有点像stream里面的flatMap 本篇

曾经踩坑党,如今护航忙 | 袋鼠云的双11故事之一

普通人提起双11,谈的都是剁手党 袋鼠云提起双11,谈的却是踩坑党 每年双11,同样的通宵达旦.同样的激动万分.同样的心跳加速,同样的肾上腺素增加,不一样的是:剁手党在Happy,踩坑党在忧虑. 这个双11,袋鼠小妹采访了曾经参与过阿里双11的几位袋鼠云技术专家,为大家分享他们别样的双11故事.他们分别是袋鼠云首席大数据架构师申杭.首席数据库架构师俊达(大家尊称:达叔),首席运维专家留良.首席售后服务专家南晨.(恩,都是首席,Teamleader级别) 袋鼠小妹有故事,那你准备好酒了么? ---

SQL Server在AlwaysOn中使用内存表的“踩坑”记录

前言 最近因为线上alwayson环境的一个数据库上使用内存表.经过大概一个星期监控程序发现了一个非常严重问题这个数据库的日志文件不会截断,已用空间一直在增加(存在定时的每个小时的日志备份),同时内存表数据库文件也无法删除,下面就介绍一下后面我的处理过程,话不多说了,来一起看看详细的介绍吧. 数据库:SQL Server2014 Enterprise Edition (64-bit) 删除文件 使用一个单独非alwayson环境的数据库测试. 一.创建内存表 ---创建内存表文件组 ALTER

秦苍科技是如何管理数百个微服务并避免踩坑的?

[编者的话]过去两年中,微服务架构是一个非常热门的技术名词.秦苍科技也在微服务方面做了大量的投资和实践,我们有开发.测试.准生产.生产四套环境,每套环境有230+个微服务,总共有近1000个微服务. 本文讲的是秦苍科技是如何管理数百个微服务并避免踩坑的?秦苍科技为什么要采用微服务的架构?如何管理这么多微服务?本文将对这些问题进行阐述,希望对正在踩坑路上和即将踩坑的朋友们有所帮助. 为什么要使用微服务 关于微服务架构优点有很多讨论.但是,个人认为许多优点都可以算作一些"伪优点".例如:

【踩坑经历】一次Asp.NET小网站部署踩坑和解决经历

2013年给1个大学的小客户部署过一个小型的Asp.NET网站,非常小,用的sqlite数据库,今年人家说要换台服务器,要重新部署一下,好吧,虽然早就过了服务时间,但无奈谁叫人家是客户了,二话不说,上,源代码和以前的文件都有,部署还不是分分钟的事情,打开IIS挂上去就行了.谁知道,这个部署将近花了2天的时间.看看踩坑过程和解决方法. 本文原文地址:http://www.cnblogs.com/asxinyu/p/4380380.html 回来一看,9个反对,我心痛啊,这些童鞋,你们觉得这篇文章哪