Apache Spark源码走读(八)Graphx实现剖析&spark repl实现详解

<一>Graphx实现剖析

概要

图的并行化处理一直是一个非常热门的话题,这里头的重点有两个,一是如何将图的算法并行化,二是找到一个合适的并行化处理框架。Spark作为一个非常优秀的并行处理框架,将一些并行化的算法移到其上面就成了一个很自然的事情。

Graphx是一些图的常用算法在Spark上的并行化实现,同时提供了丰富的API接口。本文就Graphx的代码架构及pagerank在graphx中的具体实现做一个初步的学习。

Google为什么赢得了搜索引擎大战

当Google还在起步的时候,在搜索引擎领域,Yahoo正如日中天,红的发紫。显然,在Google面前的是一堵让人几乎没有任何希望的墙。

但世事难料,现在“外事问谷歌”成了不争的事实,Yahoo应也陪客了。

这种转换到底是如何形成的了,有一个因素是这样的,那就是Google发明了显著提高搜索准确率的PageRank算法。如果说PageRank算法的提出让谷歌牢牢站稳了搜索引擎大战的脚跟,这是毫不夸张的。

搜索引擎有几个要考虑的关键因素(个人观点而已)。

  1. 要想吸引用户,就必须要有出色的搜索准确率
  2. 有了用户,才能做广告投放,提高广告投放的针对性就可以盈利

上述两个方面都有非常优秀的算法。

废话少述,回到正题。PageRank算法是图论的一个具体应用,ok, 转到图论。

图论简介

图的组成

离散数学中非常重要的一个部分就是图论,下面是一个无向连通图:

顶点(vertex)

上图中的A,B,C,D,E称为图的顶点。

顶点与顶点之间的连线称之为边。

图的数学表示

读大学的时候,一直没有想明白为什么要学劳什子的线性代数。直到这两天看《数学之美》一书时,才发觉,线性代数在一些计算机应用领域,那简直就是不可或缺啊。

我们比较容易理解的平面几何和立体几何(一个是二维,一个是三维),而线性代数解决的其实是一个高维问题,由于无法直觉的感受到,所以很难。如果想比较通俗的理解一下数学为什么有这么多的分支及其内在关联,强烈推荐读一下《数学桥 对高等数学的一次观赏之旅》

在数学中,用什么来表示图呢,答案就是线性代数里面的矩阵,想想看,图的关联矩阵,图的邻接矩阵。总之就是矩阵啦,线性代数一下子有用了。下面是一个具体的例子。

图的并行化处理

刚才说到图可以用矩阵来表示,图的并行化问题在某种程度上就被转化为矩阵运算的并行化问题。

那么以矩阵的乘法为例,看看其是否可以并行化处理。

以矩阵 A X B 为例,说明并行化处理过程。

将上述的矩阵A和B划分为四个部分,如下图所示:

首次对齐之后:

子矩阵相乘

相乘之后,A的子矩阵左移,B的子矩阵上移

 

计算结果合并

 

图的并行化处理框架,从Pregel说起

上一节的重点有两点:

  1. 图用矩阵来表示,对图的运算就是矩阵的运算
  2. 矩阵乘法运算可以并行化,动态演示其并行化的原理

你说ok,我明白了。哪有没有一种合适的并行化处理框架可以用来进行图的计算呢,那你肯定想到了MapReduce。

MapReduce尽管也是一个不错的并行化处理框架,但在图计算方面,有许多缺点,主要是计算的中间过程需要存储到硬盘,效率很低。

Google针对图的并行处理,专门提出了一个了不起的框架Pregel。其执行时的动态视图如下所示。

Pregel有如下优点

  • 级联可扩性好 scalability
  • 容错性强
  • 能够很好的表示各种图的常用算法

 

 

Pregel的计算模型

计算模型如下图所示,重要的有三个:

  1. 作用于每个顶点的处理逻辑 vertexProgram
  2. 消息发送,用于相邻节点间的通讯 sendMessage
  3. 消息合并逻辑 messageCombining

Pregel在Spark中的实现

非常感谢你能坚持看到现在,这篇博客内容很多,有点难。我想还是上一幅图将其内在逻辑整一下再继续说下去。

该图要表示的意思是这样的,Graphx利用了Spark这样了一个并行处理框架来实现了图上的一些可并行化执行的算法。

本篇博客要表达的意思就是上面加红的这句话,请诸位看官仔细理解。

  • 算法是否能够并行化与Spark本身无关
  • 算法并行化与否的本身,需要通过数学来证明
  • 已经证明的可并行化算法,利用Spark来实现会是一个错的选择,因为Graphx支持pregel的图计算模型

Graphx中的重要概念

Graph

毫无疑问,图本身是graphx中一个非常重要的概念。

成员变量

graph中重要的成员变量分别为

  1. vertices
  2. edges
  3. triplets

为什么要引入triplets呢,主要是和Pregel这个计算模型相关,在triplets中,同时记录着edge和vertex. 具体代码就不罗列了。

成员函数

函数分成几大类:

  1. 对所有顶点或边的操作,但不改变图结构本身,如mapEdges, mapVertices
  2. 子图,类似于集合操作中的filter subGraph
  3. 图的分割,即paritition操作,这个对于Spark计算来说,很关键,正是因为有了不同的Partition,才有了并行处理的可能, 不同的PartitionStrategy,其收益不同。最容易想到的就是利用Hash来将整个图分成多个区域。
  4. outerJoinVertices 顶点的外连接操作

图的运算和操作 GraphOps

图的常用算法是集中抽象到GraphOps这个类中,在Graph里作了隐式转换,将Graph转换为GraphOps

implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag]
      (g: Graph[VD, ED]): GraphOps[VD, ED] = g.ops

支持的操作如下

  1. collectNeighborIds
  2. collectNeighbors
  3. collectEdges
  4. joinVertices
  5. filter
  6. pickRandomVertex
  7. pregel
  8. pageRank
  9. staticPageRank
  10. connectedComponents
  11. triangleCount
  12. stronglyConnectedComponents

RDD

RDD是Spark体系的核心,那么Graphx中引入了哪些新的RDD呢,有俩,分别为

  1. VertexRDD
  2. EdgeRDD

较之EdgeRdd,VertexRDD更为重要,其上的操作也很多,主要集中于Vertex之上属性的合并,说到合并就不得不扯到关系代数和集合论,所以在VertexRdd中能看到许多类似于sql中的术语,如

  • leftJoin
  • innerJoin

至于leftJoin, innerJoin, outerJoin的区别,建议谷歌一下,不再赘述。

Graphx场景分析

图的存储和加载

在进行数学计算的时候,图用线性代数中的矩阵来表示,那么如何进行存储呢?

学数据结构的时候,老师肯定说过好多的办法,不再啰嗦了。

不过在大数据的环境下,如果图很巨大,表示顶点和边的数据不足以放在一个文件中怎么办? 用HDFS

加载的时候,一台机器的内存不足以容下怎么办? 延迟加载,在真正需要数据时,将数据分发到不同机器中,采用级联方式。

一般来说,我们会将所有与顶点相关的内容保存在一个文件中vertexFile,所有与边相关的信息保存在另一个文件中edgeFile。

生成某一个具体的图时,用edge就可以表示图中顶点的关联关系,同时图的结构也表示出来了。

GraphLoader

graphLoader是graphx中专门用于图的加载和生成,最重要的函数就是edgeListFile,定义如下。

def edgeListFile(
      sc: SparkContext,
      path: String,
      canonicalOrientation: Boolean = false,
      minEdgePartitions: Int = 1,
      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
    : Graph[Int, Int] =
  {
    val startTime = System.currentTimeMillis

    // Parse the edge data table directly into edge partitions
    val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)
    val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
      val builder = new EdgePartitionBuilder[Int, Int]
      iter.foreach { line =>
        if (!line.isEmpty && line(0) != '#') {
          val lineArray = line.split("\\s+")
          if (lineArray.length < 2) {
            logWarning("Invalid line: " + line)
          }
          val srcId = lineArray(0).toLong
          val dstId = lineArray(1).toLong
          if (canonicalOrientation && srcId > dstId) {
            builder.add(dstId, srcId, 1)
          } else {
            builder.add(srcId, dstId, 1)
          }
        }
      }
      Iterator((pid, builder.toEdgePartition))
    }.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))
    edges.count()

    logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))

    GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
      vertexStorageLevel = vertexStorageLevel)
  } // end of edgeListFile

 

应用举例之PageRank

什么是PageRank

PageRank是Google专有的算法,用于衡量特定网页相对于搜索引擎索引中的其他网页而言的重要程度。它由Larry Page 和 Sergey Brin在20世纪90年代后期发明。PageRank实现了将链接价值概念作为排名因素。

PageRank将对页面的链接看成投票,指示了重要性。

pageRank的核心思想

”在互联网上,如果一个网页被很多其它网页所链接,说明它受到普遍的承认和依赖,那么它的排名就很高。“  (摘自数学之美第10章)

你说这也太简单了吧,不是跟没说一个样吗,怎么用数学来表示呢?

呵呵,起初我也这么想的,后来多看了几遍之后,明白了一点点。分析步骤用文字表述如下,

  1. 网页和网页之间的关系用图来表示
  2. 网页A和网页B之间的连接关系表示任意一个用户从网页A到转到网页B的可能性(概率)
  3. 所有网页的排名用一维向量来B来表示

所有网页之间的连接用矩阵A来表示,所有网页排名用B来表示。

pageRank如何进行并行化

好了,上面的数学阐述说明了“网页排名的计算可以最终抽象为矩阵相乘”,而在开始的时候已经证明过矩阵相乘可以并行化处理

理论研究结束了,接下来的就是工程实现了,借用Pregel模型,PageRank中定义的各主要函数分别如下。

vertexProgram

def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = {
      val (oldPR, lastDelta) = attr
      val newPR = oldPR + (1.0 - resetProb) * msgSum
      (newPR, newPR - oldPR)
    }

sendMessage

def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
      if (edge.srcAttr._2 > tol) {
        Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
      } else {
        Iterator.empty
      }
    }

messageCombiner

def messageCombiner(a: Double, b: Double): Double = a + b

一点点启示

通过pageRank这个例子,我们能够搞清楚如何将平素学习的数学理论用以解决实际问题。

“学习的东西总是有价值的,至于用的上用不上,全靠造化了”

完整代码

// Connect to the Spark cluster
val sc = new SparkContext("spark://master.amplab.org", "research")
// Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile("graphx/data/users.txt")
  .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
// Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
// Attach the user attributes
val graph = followerGraph.outerJoinVertices(users) {
  case (uid, deg, Some(attrList)) => attrList
  // Some users may not have attributes so we set them as empty
  case (uid, deg, None) => Array.empty[String]
}
// Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
// Compute the PageRank
val pagerankGraph = subgraph.pageRank(0.001)
// Get the attributes of the top pagerank users
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
  case (uid, attrList, Some(pr)) => (pr, attrList.toList)
  case (uid, attrList, None) => (0.0, attrList.toList)
}

println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))

小结

本篇讲来讲去就在强调一个问题,Spark是一个分布式并行计算框架。能不能用Spark,其实大体取决于问题的数学模型本身,如果可以并行化处理,则用之,切不可削足适履。

另一个用张图来总结一下提到的数学知识吧。

 

再一次强烈推荐《数学桥》

参考资料

  1. 《数学之美》
  2. 《数学桥 高等数学的观赏之旅》
  3. 《大数据》

<二>spark repl实现详解

概要

之所以对spark shell的内部实现产生兴趣全部缘于好奇代码的编译加载过程,scala是需要编译才能执行的语言,但提供的scala repl可以实现代码的实时交互式执行,这是为什么呢?

既然scala已经提供了repl,为什么spark还要自己单独搞一套spark repl,这其中的缘由到底何在?

显然,这些都是问题,要解开这些谜团,只有再次开启一段源码分析之旅了。

全局视图

 

上图显示了java源文件从编译到加载执行的全局视图,整个过程中最主要的步骤是:

  1. 编译成过程,由编译器对java源文件进行编译整理,生成java bytecodes
  2. 类的加载和初始化,主要由classloader参与
  3. 执行引擎 将字节码翻译成机器码,然后调度执行

这一部分的内容,解释的非常详细的某过于《深入理解jvm》和撒迦的JVM分享,这里就不班门弄斧了。

那么讲上述这些内容的目的又何在呢,我们知道scala也是需要编译执行的,那么编译的结果是什么样呢,要符合什么标准?在哪里执行。

答案比较明显,scala源文件也需要编译成java bytecodes,和java的编译结果必须符合同一份标准,生成的bytecode都是由jvm的执行引擎转换成为机器码之后调度执行。

也就是说尽管scala和java源文件的编译器不同,但它们生成的结果必须符合同一标准,否则jvm无法正确理解,执行也就无从谈起。至于scala的编译器是如何实现的,文中后续章节会涉及。

ELF可执行文件的加载和运行

”CPU是很傻的,加电后,它就会一直不断的读取指令,执行指令,不能停的哦。“ 如果有了这个意识,看源码的时候你就会有无穷的疑惑,无数想不通的地方,这也能让你不断的进步。

再继续讲scala源文件的编译细节之前,我们还是来温习一下基础的内容,即一个EFL可执行文件是如何加载到内存真正运行起来的。(本篇博客的内容相对比较底层,很费脑子的,:)

Linux平台上基本采用ELF作为可执行文件的格式,java可执行文件本身也是ELF格式的,使用file指令来作检验。

file /opt/java/bin/java

下面是输出的结果,从结果中可以证实java也是ELF格式。

/opt/java/bin/java: ELF 64-bit LSB executable, x86-64, version 1 (SYSV), dynamically linked (uses shared libs), for GNU/Linux 2.6.9, BuildID[sha1]=bd74b7294ebbdd93e9ef3b729e5aab228a3f681b, stripped

ELF文件的执行过程大致如下

  1. fork创建一个进程
  2. 调用execve来执行ELF
  3. ELF的加载过程中会有动态加载和链接发生
  4. 全局变量的初始化,这一部分和glibc相关
  5. 执行main函数

我讲述的过程非常笼统,要想更清楚的了解细节,请参阅《深入理解Linux内核》中的程序的执行一章,或是《深入Linux内核架构》中的启动新程序一节。

现在打开内核中相应的源码,看看execve函数是如何找到elf格式的处理句柄的。

第一步:每一种二进制格式,必须先注册自己的处理句柄。

在文件$KERNEL_HOME/fs/binfmt_elf.c中,init_elf_binfmt函数就实现了注册任务

static int __init init_elf_binfmt(void)
{
  register_binfmt(&elf_format);
  return 0;
}

来看一看elf_format的定义是什么

static struct linux_binfmt elf_format = {
  .module		= THIS_MODULE,
  .load_binary	= load_elf_binary,
  .load_shlib	= load_elf_library,
  .core_dump	= elf_core_dump,
  .min_coredump	= ELF_EXEC_PAGESIZE,
};

第二步:搜索处理句柄,fs/exec.c

execve是一个系统调用,内核中对应的函数是do_execve,具体代码不再列出。

do_execve->do_execve_common->search_binary_hander

注意search_binary_handler会找到上一步中注册的binary_handler即elf_format,找到了对应的handler之后,关键的一步就是load_binary了。动态链接过程调用的是load_shlib,这一部分的内容细细展开的话,够写几本书了。

search_binary_handler的部分代码

retry:
  read_lock(&binfmt_lock);
  list_for_each_entry(fmt, &formats, lh) {
    if (!try_module_get(fmt->module))
      continue;
    read_unlock(&binfmt_lock);
    bprm->recursion_depth++;
    retval = fmt->load_binary(bprm);
    bprm->recursion_depth--;
    if (retval >= 0 || retval != -ENOEXEC ||
        bprm->mm == NULL || bprm->file == NULL) {
      put_binfmt(fmt);
      return retval;
    }
    read_lock(&binfmt_lock);
    put_binfmt(fmt);
  }
  read_unlock(&binfmt_lock);

 要想对这一部分内容有个比较清楚的了解,建议看一下台湾黄敬群先生的《深入浅出Helloworld》和国内出版的《程序员的自我修养》

另外一个值得推荐的是黑客级的网站phrack.org,可惜现在不更新了。

之所以讲ELF的加载和运行,是因为要打通java源文件的编译执行过程的话,必然会步步深入到此,其实到这还不够,再往下走就是CPU指令,只有到达CPU指令才算真正到底。这个时候就需要去读intel ia-64 software programmer guide了。

 源码走读其实只是个形式,重要的是能理清楚其执行流程,以到达指令级的理解为最佳。

Java类的加载和执行

在各位java达人面前,我就不显示自己java水平有多烂了。只是将两幅最基本的图搬出来,展示一下java类的加载过程,以及classloader的层次关系。记住这些东东会为我们在后头讨论scala repl奠定良好基础。

序列化和反序列化

Java体系中,另一个重要的基石就是类的序列化和反序列化。这里要注意的就是当有继承体系时,类的序列化和反序列化顺序,以及类中有静态成员变量的时候,如何处理序列化。诸如此类的文章,一搜一大把,我再多加解释实在是画蛇添足,列出来只是说明其重要性罢了。

spark-shell的执行路径

前面进行了这么多的铺垫之后,我想可以进入正题了。即spark-shell的执行调用路径到底怎样。

首次使用Spark一般都是从执行spark-shell开始的,当在键盘上敲入spark-shell并回车时,后面究竟发生了哪些事情呢?

export SPARK_SUBMIT_OPTS
$FWDIR /bin/spark - submit spark -shell "$@" --class org.apache.spark.repl.Main

可以看出spark-shell其实是对spark-submit的一层封装,但事情到这还没有结束,毕竟还没有找到调用java的地方,继续往下搜索看看spark-submit脚本的内容。

exec $SPARK_HOME /bin/spark -class org. apache .spark.
deploy . SparkSubmit "${ ORIG_ARGS [@]}"

离目标越来越近了,spark-class中会调用到java程序,与java相关部分的代码摘录如下

# Find the java binary
if [ -n "${ JAVA_HOME }" ]; then
RUNNER ="${ JAVA_HOME }/ bin/java"
else
if [ `command -v java ` ]; then
RUNNER ="java"
else
echo " JAVA_HOME is not set" >&2
exit 1
fi
fi
exec " $RUNNER " -cp " $CLASSPATH " $JAVA_OPTS "$@"

SparkSubmit当中定义了Main函数,在它的处理中会将spark repl运行起来,spark repl能够接收用户的输入,通过编译与运行,返回结果给用户。这就是为什么spark具有交互处理能力的原因所在。调用顺序如下

  1. SparkSubmit
  2. repl.Main
  3. SparkILoop

利用jvisualvm验证

修改spark-class,使得JAVA_OPTS看起来如下图所示:

JMX_OPTS="-Dcom.sun.management.jmxremote.port=8300 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1"
# Set JAVA_OPTS to be able to load native libraries and to set heap size
JAVA_OPTS="-XX:MaxPermSize=128m $OUR_JAVA_OPTS $JMX_OPTS"
JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"

修改完上述脚本之后先启动spark-shell,然后再启动jvisualvm

bin/spark-shell
jvisualvm

 在Java VisualVM中选择进程org.apache.spark.deploy.SparkSubmit,如果已经为jvisualvm安装了插件Threads Inspector,其界面将会与下图很类似:

 在右侧选择“线程”这一tab页,选择线程main,然后可以看到该线程的thread dump信息

spark repl vs. scala repl

既然scala已经提供了repl, spark还是要自己去实现一个repl,你不觉着事有可疑么?我谷歌了好长时间,终于找到了大神的讨论帖子,不容易啊,原文摘录如下。

Thanks for looping me in! Just FYI, I would also be okay if instead of making the wrapper code pluggable, the REPL just changed to one based on classes, as in Prashant's example, rather than singleton objects.

 

To give you background on this, the problem with the "object" wrappers is that initialization code goes into a static initializer that will have to run on all worker nodes, making the REPL unusable with distributed applications. As an example, consider this:

 

// file.txt is a local file on just the master

val data = scala.io.Source.fromFile("file.txt").mkString

 

// now we use the derived string, "data", in a closure that runs on the cluster

spark.textFile.map(line => doStuff(line, data))

 

The current Scala REPL creates an object Line1 whose static initializer sets data with the code above, then does import Line1.data in the closure, which will cause the static initializer to run *again* on the remote node and fail. This issue definitely affects Spark, but it could also affect other interesting projects that could be built on Scala's REPL, so it may be an interesting thing to consider supporting in the standard interpreter.

 

Matei

上述内容估计第一次看了之后,除了一头雾水还是一头雾水。翻译成为白话就是利用scala原生的repl,是使用object来封装输入的代码的,这有什么不妥,“序列化和反序列化”的问题啊。反序列化的过程中,对象的构造函数会被再次调用,而这并不是我们所期望的。我们希望生成class而不是object,如果你不知道object和class的区别,没关系,看一下scala的简明手册,马上就明白了。

最重要的一点:Scala Repl默认输入的代码都是在本地执行,故使用objectbasedwraper是没有问题的。但在spark环境下,输入的内容有可能需要在远程执行,这样objectbasedwrapper的源码生成方式经序列化反序列化会有相应的副作用,导致出错不可用。

讨论详情,请参考该Link https://groups.google.com/forum/#!msg/scala-internals/h27CFLoJXjE/JoobM6NiUMQJ

scala repl执行过程

再啰嗦一次,scala是需要编译执行的,而repl给我们的错觉是scala是解释执行的。那我们在repl中输入的语句是如何被真正执行的呢?

简要的步骤是这样的

  1. 在repl中输入的每一行语句,都会被封装为一个object, 这一工作主要由interpreter完成
  2. 对该object进行编译
  3. 由classloader加载编译后的java bytecode
  4. 执行引擎负责真正执行加载入内存的bytecode

interpreter in scala repl

那么怎么证明我说的是对的呢?很简单,做个实验,利用下述语句了启动scala repl

scala -Dscala.repl.debug=true

 如果我们输入这样一条语句 val c = 10,由interpreter生成的scala源码会如下所列

object $read extends scala.AnyRef {
  def () = {
    super.;
    ()
  };
  object $iw extends scala.AnyRef {
    def () = {
      super.;
      ()
    };
    object $iw extends scala.AnyRef {
      def () = {
        super.;
        ()
      };
      val c = 10
    }
  }
}

注意啰,是object哦,不是class

interpreter in spark repl

那我们再看看spark repl生成的scala源码是什么样子的?

启动spark-shell之前,修改一下spark-class,在JAVA_OPTS中加入如下内容

-Dscala.repl.debug=true

启动spark-shell,输入val b = 10,生成的scala源码如下所示

class $read extends AnyRef with Serializable {
    def (): $line10.$read = {
      $read.super.();
      ()
    };
    class $iwC extends AnyRef with Serializable {
      def (): $read.this.$iwC = {
        $iwC.super.();
        ()
      };
      class $iwC extends AnyRef with Serializable {
        def (): $iwC = {
          $iwC.super.();
          ()
        };
        import org.apache.spark.SparkContext._;
        class $iwC extends AnyRef with Serializable {
          def (): $iwC = {
            $iwC.super.();
            ()
          };
          class $iwC extends AnyRef with Serializable {
            def (): $iwC = {
              $iwC.super.();
              ()
            };
            private[this] val b: Int = 100;
              def b: Int = $iwC.this.b
          };
          private[this] val $iw: $iwC = new $iwC.this.$iwC();
            def $iw: $iwC = $iwC.this.$iw
        };
        private[this] val $iw: $iwC = new $iwC.this.$iwC();
          def $iw: $iwC = $iwC.this.$iw
      };
      private[this] val $iw: $iwC = new $iwC.this.$iwC();
        def $iw: $iwC = $iwC.this.$iw
    };
    private[this] val $iw: $read.this.$iwC = new $read.this.$iwC();
      def $iw: $read.this.$iwC = $read.this.$iw
  };
  object $read extends scala.AnyRef with Serializable {
    def (): $line10.$read.type = {
      $read.super.();
      ()
    };
    private[this] val INSTANCE: $line10.$read = new $read();
      def INSTANCE: $line10.$read = $read.this.INSTANCE;
     private def readResolve(): Object = $line10.this.$read
  }
}

注意到与scala repl中的差异了么,此处是class而非object

IMain.scala vs. SparkIMain.scala

是什么导致有上述的差异的呢?我们可以下载scala的源码,对是scala本身的源码在github上可以找到。interpreter中代码生成部分的处理逻辑主要是在IMain.scala,在spark中是SparkIMain.scala。

比较两个文件的异同。

gvimdiff IMain.scala SparkIMain.scala

gvimdiff是个好工具,两个文件的差异一目了然,emacs和vim总要有一样玩的转才行啊。来个屏幕截图吧,比较炫吧。

注:spark开发团队似乎给scala的开发小组提了一个case,在最新的scala中似乎已经支持classbasedwrapper,可以通过现应的选项来设置来选择classbasedwraper和objectbasedwrapper.

下述代码见最新版scala,scala-2.12.x中的IMain.scala:

  private lazy val ObjectSourceCode: Wrapper =
      if (settings.Yreplclassbased) new ClassBasedWrapper else new ObjectBasedWrapper

compiler

scala实现了自己的编译器,处理逻辑的代码实现见scala源码中的src/compiler目录下的源文件。有关其处理步骤不再赘述,请参考ref3,ref4中的描述。

有一点想要作个小小提醒的时,当你看到SparkIMain.scala中有new Run的语句却不知道这个Run在哪的时候,兄弟跟你讲在scala中的Global.scala里可以找到, :)

小结

编译和加载是一个非常有意思的话题,即可以说是很基础也可以说很冷门,有无动力就这部分进行深究,就看个人的兴趣了。

参考资料

  1. http://phrack.org/issues/58/5.html
  2. http://blog.linux.org.tw/~jserv/archives/002125.html
  3. http://lampwww.epfl.ch/~magarcia/ScalaCompilerCornerReloaded/2012Q2/GenASM.pdf
  4. https://wiki.scala-lang.org/display/SIW/Overview+of+Compiler+Phases
  5. 《深入理解Linux内核》
  6. 《深入Linux内核架构》
  7. 《深入理解Java虚拟机》
时间: 2024-09-06 00:25:43

Apache Spark源码走读(八)Graphx实现剖析&spark repl实现详解的相关文章

Apache Spark源码走读(七)Standalone部署方式分析&amp;sql的解析与执行

<一>Standalone部署方式分析 楔子 在Spark源码走读系列之2中曾经提到Spark能以Standalone的方式来运行cluster,但没有对Application的提交与具体运行流程做详细的分析,本文就这些问题做一个比较详细的分析,并且对在standalone模式下如何实现HA进行讲解. 没有HA的Standalone运行模式 先从比较简单的说起,所谓的没有ha是指master节点没有ha. 组成cluster的两大元素即Master和Worker.slave worker可以有

Apache Spark源码走读

http://www.aliyun.com/zixun/aggregation/13383.html">Spark是发源于美国加州大学伯克利分校AMPLab的集群计算平台,它立足于内存计算,性能超过Hadoop百倍,即使使用磁盘,迭代类型的计算也会有10倍速度的提升.Spark从多迭代批量处理出发,兼收并蓄数据仓库.流处理和图计算等多种计算范式,是罕见的全能选手.Spark当下已成为Apache基金会的顶级开源项目,拥有着庞大的社区支持--活跃开发者人数已超过Hadoop MapReduc

Apache Spark源码走读(十一)浅谈mllib中线性回归的算法实现&amp;Spark MLLib中拟牛顿法L-BFGS的源码实现

<一>浅谈mllib中线性回归的算法实现 概要 本文简要描述线性回归算法在Spark MLLib中的具体实现,涉及线性回归算法本身及线性回归并行处理的理论基础,然后对代码实现部分进行走读. 线性回归模型 机器学习算法是的主要目的是找到最能够对数据做出合理解释的模型,这个模型是假设函数,一步步的推导基本遵循这样的思路 假设函数 为了找到最好的假设函数,需要找到合理的评估标准,一般来说使用损失函数来做为评估标准 根据损失函数推出目标函数 现在问题转换成为如何找到目标函数的最优解,也就是目标函数的最

Apache Spark源码走读(四)Hive on Spark运行环境搭建 &amp;hiveql on spark实现详解

<一>Hive on Spark运行环境搭建 楔子 Hive是基于Hadoop的开源数据仓库工具,提供了类似于SQL的HiveQL语言,使得上层的数据分析人员不用知道太多MapReduce的知识就能对存储于Hdfs中的海量数据进行分析.由于这一特性而收到广泛的欢迎. Hive的整体框架中有一个重要的模块是执行模块,这一部分是用Hadoop中MapReduce计算框架来实现,因而在处理速度上不是非常令人满意.由于Spark出色的处理速度,有人已经成功将HiveQL的执行利用Spark来运行,这就

Apache Spark源码走读(三)Spark on Yarn &amp;Spark源码编译 &amp;在YARN上运行SparkPi

<一>Spark on Yarn 概要 Hadoop2中的Yarn是一个分布式计算资源的管理平台,由于其有极好的模型抽象,非常有可能成为分布式计算资源管理的事实标准.其主要职责将是分布式计算集群的管理,集群中计算资源的管理与分配. Yarn为应用程序开发提供了比较好的实现标准,Spark支持Yarn部署,本文将就Spark如何实现在Yarn平台上的部署作比较详尽的分析. Spark Standalone部署模式回顾 上图是Spark Standalone Cluster中计算模块的简要示意,从

Apache Spark源码走读(六)Task运行期之函数调用关系分析 &amp;存储子系统分析

<一>Task运行期之函数调用关系分析 概要 本篇主要阐述在TaskRunner中执行的task其业务逻辑是如何被调用到的,另外试图讲清楚运行着的task其输入的数据从哪获取,处理的结果返回到哪里,如何返回. 准备 spark已经安装完毕 spark运行在local mode或local-cluster mode local-cluster mode local-cluster模式也称为伪分布式,可以使用如下指令运行 MASTER=local[1,2,1024] bin/spark-shell

Apache Spark源码走读(九)如何进行代码跟读&amp;使用Intellij idea调试Spark源码

<一>如何进行代码跟读 概要 今天不谈Spark中什么复杂的技术实现,只稍为聊聊如何进行代码跟读.众所周知,Spark使用scala进行开发,由于scala有众多的语法糖,很多时候代码跟着跟着就觉着线索跟丢掉了,另外Spark基于Akka来进行消息交互,那如何知道谁是接收方呢? new Throwable().printStackTrace 代码跟读的时候,经常会借助于日志,针对日志中输出的每一句,我们都很想知道它们的调用者是谁.但有时苦于对spark系统的了解程度不深,或者对scala认识不

Apache Spark源码走读(一)Spark论文阅读笔记&amp;Job提交与运行

<一>Spark论文阅读笔记 楔子 源码阅读是一件非常容易的事,也是一件非常难的事.容易的是代码就在那里,一打开就可以看到.难的是要通过代码明白作者当初为什么要这样设计,设计之初要解决的主要问题是什么. 在对Spark的源码进行具体的走读之前,如果想要快速对Spark的有一个整体性的认识,阅读Matei Zaharia做的Spark论文是一个非常不错的选择. 在阅读该论文的基础之上,再结合Spark作者在2012 Developer Meetup上做的演讲Introduction to Spa

Apache Spark源码走读(二)DStream实时流数据处理 &amp;DStream处理的容错性分析

<一>DStream实时流数据处理 Spark Streaming能够对流数据进行近乎实时的速度进行数据处理.采用了不同于一般的流式数据处理模型,该模型使得Spark Streaming有非常高的处理速度,与storm相比拥有更高的吞能力. 本篇简要分析Spark Streaming的处理模型,Spark Streaming系统的初始化过程,以及当接收到外部数据时后续的处理步骤. 系统概述 流数据的特点 与一般的文件(即内容已经固定)型数据源相比,所谓的流数据拥有如下的特点 数据一直处在变化中