背景
graphx实现k-core比较简单,参考淘宝技术部之前的文章,已经给出了一个代码片段,基本上改改就可以定制自己的需求了。
code
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx.lib._
// load the graph
val friendsGraph = GraphLoader.edgeListFile(sc, "data/friends.txt.", false, 512, StorageLevel.MEMORY_ONLY, StorageLevel.DISK_ONLY)
var degreeGraph = friendsGraph.outerJoinVertices(friendsGraph.degrees) {
(vid, vd, degree) => degree.getOrElse(0)
}.cache()
val kNum = 200
var lastVerticeNum: Long = degreeGraph.numVertices
var thisVerticeNum: Long = -1
var isConverged = false
val maxIter = 10
var i = 1
while (!isConverged && i <= maxIter) {
val subGraph = degreeGraph.subgraph(
vpred = (vid, degree) => degree >= kNum
).cache()
degreeGraph = subGraph.outerJoinVertices(subGraph.degrees) {
(vid, vd, degree) => degree.getOrElse(0)
}.cache()
thisVerticeNum = degreeGraph.numVertices
if (lastVerticeNum == thisVerticeNum) {
isConverged = true
println("vertice num is " + thisVerticeNum + ", iteration is " + i)
} else {
println("lastVerticeNum is " + lastVerticeNum + ", thisVerticeNum is " + thisVerticeNum + ", iteration is " + i + ", not converge")
lastVerticeNum = thisVerticeNum
}
i += 1
}
// do something to degreeGraph
拼的主要是子图的计算速度。
全文完 :)
时间: 2024-12-02 02:22:47