Spark系统内的代码跟读

概要

今天我们只聊聊代码跟读的方法,不进行Spark中那些复杂的技术实现。想必大家都知道,Spark用scala进行开发,但是因为scala众多的语法糖的缘故,代码经常会跟着跟着就发现线索跟丢掉了,其次,Spark是基于Akka来进行交互的,那又怎么才能知道谁是接收方呢?

new Throwable().printStackTrace

在进行代码跟读时,我们要经常借助日志,而对于日志中输出的每一句,我们都希望知道谁在对它们进行调用。可是却苦于对spark了解得并不多,或者对scala不太熟悉,短时间内无法弄明白,那有没有什么比较容易的办法呢?

我的办法就是在日志出现的地方加入下面一句话

new Throwable().printStackTrace()

现在举一个实际的例子来说明问题。

比如我们在启动spark-shell之后,输入一句非常简单的sc.textFile("README.md"),会输出下述的log

14/07/05 19:53:27 INFO MemoryStore: ensureFreeSpace(32816) called with curMem=0, maxMem=308910489 14/07/05 19:53:27 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 294.6 MB)  14/07/05 19:53:27 DEBUG BlockManager: Put block broadcast_0 locally took  78 ms  14/07/05 19:53:27 DEBUG BlockManager: Putting block broadcast_0 without replication took  79 ms  res0: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at :13

那我很想知道是第二句日志所在的tryToPut函数是被谁调用的该怎么办?

办法就是打开MemoryStore.scala,找到下述语句

logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(            blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))

在这句话之上,添加如下语句

new Throwable().printStackTrace()

然后,重新进行源码编译

sbt/sbt assembly

再次打开spark-shell,执行sc.textFile("README.md"),就可以得到如下输出,从中可以清楚知道tryToPut的调用者是谁

14/07/05 19:53:27 INFO MemoryStore: ensureFreeSpace(32816) called with curMem=0, maxMem=308910489 14/07/05 19:53:27 WARN MemoryStore: just show the calltrace by entering some modified code  java.lang.Throwable    at org.apache.spark.storage.MemoryStore.tryToPut(MemoryStore.scala:182)    at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:76)    at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:92)    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:699)    at org.apache.spark.storage.BlockManager.put(BlockManager.scala:570)    at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:821)    at org.apache.spark.broadcast.HttpBroadcast.(HttpBroadcast.scala:52)    at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:35)    at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:29)    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:787)    at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:556)    at org.apache.spark.SparkContext.textFile(SparkContext.scala:468)    at $line5.$read$$iwC$$iwC$$iwC$$iwC.(:13)    at $line5.$read$$iwC$$iwC$$iwC.(:18)    at $line5.$read$$iwC$$iwC.(:20)    at $line5.$read$$iwC.(:22)    at $line5.$read.(:24)    at $line5.$read$.(:28)    at $line5.$read$.()    at $line5.$eval$.(:7)    at $line5.$eval$.()    at $line5.$eval.$print()    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)    at java.lang.reflect.Method.invoke(Method.java:483)    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056)    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841)    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601)    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982)    at org.apache.spark.repl.Main$.main(Main.scala:31)    at org.apache.spark.repl.Main.main(Main.scala)    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)    at java.lang.reflect.Method.invoke(Method.java:483)    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)  14/07/05 19:53:27 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 294.6 MB)  14/07/05 19:53:27 DEBUG BlockManager: Put block broadcast_0 locally took  78 ms  14/07/05 19:53:27 DEBUG BlockManager: Putting block broadcast_0 without replication took  79 ms  res0: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at :13 git同步

对代码作了修改之后,如果并不想提交代码,那该如何将最新的内容同步到本地呢?

git reset --hard  git pull origin master Akka消息跟踪

追踪消息的接收者是谁,相对来说比较容易,只要使用好grep就可以了,当然前提是要对actor model有一点点了解。

还是举个实例吧,我们知道CoarseGrainedSchedulerBackend会发送LaunchTask消息出来,那么谁是接收方呢?只需要执行以下脚本即可。

grep LaunchTask -r core/src/main

从如下的输出中,可以清楚看出CoarseGrainedExecutorBackend是LaunchTask的接收方,接收到该函数之后的业务处理,只需要去看看接收方的receive函数即可。

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala:    case LaunchTask(data) =>  core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala:        logError("Received LaunchTask command but executor was null")  core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala:  case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage  core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala:          executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask)) 小结

今天的内容相对简单,没有技术含量,自己做个记述,免得时间久了,不记得。

时间: 2024-09-09 21:20:15

Spark系统内的代码跟读的相关文章

Apache Spark源码走读(九)如何进行代码跟读&使用Intellij idea调试Spark源码

<一>如何进行代码跟读 概要 今天不谈Spark中什么复杂的技术实现,只稍为聊聊如何进行代码跟读.众所周知,Spark使用scala进行开发,由于scala有众多的语法糖,很多时候代码跟着跟着就觉着线索跟丢掉了,另外Spark基于Akka来进行消息交互,那如何知道谁是接收方呢? new Throwable().printStackTrace 代码跟读的时候,经常会借助于日志,针对日志中输出的每一句,我们都很想知道它们的调用者是谁.但有时苦于对spark系统的了解程度不深,或者对scala认识不

代码跟读如何进行?

概要 今天,我们不谈Spark中复杂的技术实现,只是稍微如何进行代码跟读.Spark使用scala进行开发这是众所周知的事情,因为在scala有着众多的语法糖,在很多时候回出现代码跟着跟着就觉着线索跟丢了,另外Spark是基于Akka进行的信息交互,那么怎样知道对方是接收方? new Throwable().printStackTrace 在代码跟读的时候,用户常常会向日志求助,在阅览日志中输出的每一句后,很多人都会想知道它们的调用者是谁.但有时因为不够深入的了解spark系统,又或者因为对sc

赫夫曼编码有部分代码没读明白 有n个 权值 为什么编码结束符在n-1的位置 ?cd[n-1]?

问题描述 赫夫曼编码有部分代码没读明白 有n个 权值 为什么编码结束符在n-1的位置 ?cd[n-1]? #include #include #include typedef struct{ unsigned int weight; int parent,lchild,rchild; }HTNode,*HuffmanTree; //动态分配数组存储赫夫曼树 typedef char **HuffmanCode;//动态分配数组存储赫夫曼编码表 void Select(HuffmanTree HT

JAVA并发容器代码随读

1. java.util.concurrent所提供的并发容器 java.util.concurrent提供了多种并发容器,总体上来说有4类,队列类型的BlockingQueue和 ConcurrentLinkedQueue,Map类型的ConcurrentMap,Set类型的ConcurrentSkipListSet和CopyOnWriteArraySet,List类型的CopyOnWriteArrayList. 这些并发容器都采用了多种手段控制并发的存取操作,并且尽可能减小控制并发所带来的性

C# XML操作 代码大全(读XML,写XML,更新,删除节点,与dataset结合等)第1/2页_实用技巧

已知有一个XML文件(bookstore.xml)如下: Corets, Eva 5.95 1.插入节点 往节点中插入一个节点: 复制代码 代码如下: XmlDocument xmlDoc=new XmlDocument(); xmlDoc.Load("bookstore.xml"); XmlNode root=xmlDoc.SelectSingleNode("bookstore");//查找 XmlElement xe1=xmlDoc.CreateElement(

从代码层读懂Java HashMap的实现原理

概述 Hashmap继承于AbstractMap,实现了Map.Cloneable.Java.io.Serializable接口.它的key.value都可以为null,映射不是有序的.Hashmap不是同步的,如果想要线程安全的HashMap,可以通过Collections类的静态方法synchronizedMap获得线程安全的HashMap. Map map = Collections.synchronizedMap(new HashMap()); HashMap 中两个重要的参数:"初始容

C++11:使用 auto/decltype/result_of使代码可读易维护

C++11 终于加入了自动类型推导.以前,我们不得不使用Boost的相关组件来实现,现在,我们可以使用"原生态"的自动类型推导了! C++引入自动的类型推导,并不是在向动态语言(强类型语言又称静态类型语言,是指需要进行变量/对象类型声明的语言,一般情况下需要编译执行.例如C/C++/Java:弱类型语言又称动态类型语言,是指不需要进行变量/对象类型声明的语言,一般情况下不需要编译(但也有编译型的).例如PHP/ASP/Ruby/Python/Perl/ABAP/SQL/JavaScri

Android 命名规范 (提高代码可以读性)

先初略介绍下当前主要的标识符命名法和英文缩写规则:(这段别人那边借鉴的) 在讲解命名规范前,先初略介绍下当前主要的标识符命名法和英文缩写规则.标识符命名法标识符命名法最要有四种: 1 驼峰(Camel)命名法:又称小驼峰命名法,除首单词外,其余所有单词的第一个字母大写. 2 帕斯卡(pascal)命名法:又称大驼峰命名法,所有单词的第一个字母大写 3 下划线命名法:单词与单词间用下划线做间隔. 4 匈牙利命名法:广泛应用于微软编程环境中,在以Pascal命名法的变量前附加小写序列说明该变量的类型

从代码层读懂HashMap的实现原理

概述 Hashmap继承于AbstractMap,实现了Map.Cloneable.java.io.Serializable接口.它的key.value都可以为null,映射不是有序的. Hashmap不是同步的,如果想要线程安全的HashMap,可以通过Collections类的静态方法synchronizedMap获得线程安全的HashMap. Map map = Collections.synchronizedMap(new HashMap()); HashMap 中两个重要的参数:"初始