打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
GraphX Pregel API · spark

Pregel API

图本身是递归数据结构,顶点的属性依赖于它们邻居的属性,这些邻居的属性又依赖于自己邻居的属性。所以许多重要的图算法都是迭代的重新计算每个顶点的属性,直到满足某个确定的条件。一系列的图并发(graph-parallel)抽象已经被提出来用来表达这些迭代算法。GraphX公开了一个类似Pregel的操作,它是广泛使用的PregelGraphLab抽象的一个融合。

+

GraphX中实现的这个更高级的Pregel操作是一个约束到图拓扑的批量同步(bulk-synchronous)并行消息抽象。Pregel操作者执行一系列的超步(super steps),在这些步骤中,顶点从之前的超步中接收进入(inbound)消息的总和,为顶点属性计算一个新的值,然后在以后的超步中发送消息到邻居顶点。不像Pregel而更像GraphLab,消息通过边triplet的一个函数被并行计算,消息的计算既会访问源顶点特征也会访问目的顶点特征。在超步中,没有收到消息的顶点会被跳过。当没有消息遗留时,Pregel操作停止迭代并返回最终的图。

+

注意,与标准的Pregel实现不同的是,GraphX中的顶点仅仅能发送信息给邻居顶点,并且可以利用用户自定义的消息函数并行地构造消息。这些限制允许对GraphX进行额外的优化。

+

下面的代码是pregel的具体实现。

+

def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]     (graph: Graph[VD, ED],      initialMsg: A,      maxIterations: Int = Int.MaxValue,      activeDirection: EdgeDirection = EdgeDirection.Either)     (vprog: (VertexId, VD, A) => VD,      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],      mergeMsg: (A, A) => A)    : Graph[VD, ED] =  {    var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()    // 计算消息    var messages = g.mapReduceTriplets(sendMsg, mergeMsg)    var activeMessages = messages.count()    // 迭代    var prevG: Graph[VD, ED] = null    var i = 0    while (activeMessages > 0 && i < maxIterations) {      // 接收消息并更新顶点      prevG = g      g = g.joinVertices(messages)(vprog).cache()      val oldMessages = messages      // 发送新消息      messages = g.mapReduceTriplets(        sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()      activeMessages = messages.count()      i += 1    }    g  }

1 pregel计算模型

Pregel计算模型中有三个重要的函数,分别是vertexProgramsendMessagemessageCombiner

+

  • vertexProgram:用户定义的顶点运行程序。它作用于每一个顶点,负责接收进来的信息,并计算新的顶点值。

    +

  • sendMsg:发送消息

    +

  • mergeMsg:合并消息

    +

我们具体分析它的实现。根据代码可以知道,这个实现是一个迭代的过程。在开始迭代之前,先完成一些初始化操作:

+

var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()// 计算消息var messages = g.mapReduceTriplets(sendMsg, mergeMsg)var activeMessages = messages.count()

程序首先用vprog函数处理图中所有的顶点,生成新的图。然后用生成的图调用聚合操作(mapReduceTriplets,实际的实现是我们前面章节讲到的aggregateMessagesWithActiveSet函数)获取聚合后的消息。activeMessagesmessages这个VertexRDD中的顶点数。

+

下面就开始迭代操作了。在迭代内部,分为二步。

+

  • 1 接收消息,并更新顶点
 g = g.joinVertices(messages)(vprog).cache() //joinVertices的定义 def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD)     : Graph[VD, ED] = {     val uf = (id: VertexId, data: VD, o: Option[U]) => {       o match {         case Some(u) => mapFunc(id, data, u)         case None => data       }     }     graph.outerJoinVertices(table)(uf)   }

这一步实际上是使用outerJoinVertices来更新顶点属性。outerJoinVertices关联操作中有详细介绍。

+

  • 2 发送新消息
 messages = g.mapReduceTriplets(        sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()

注意,在上面的代码中,mapReduceTriplets多了一个参数Some((oldMessages, activeDirection))。这个参数的作用是:它使我们在发送新的消息时,会忽略掉那些两端都没有接收到消息的边,减少计算量。

+

2 pregel实现最短路径

import org.apache.spark.graphx._import org.apache.spark.graphx.util.GraphGeneratorsval graph: Graph[Long, Double] =  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)val sourceId: VertexId = 42 // The ultimate source// 初始化图val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)val sssp = initialGraph.pregel(Double.PositiveInfinity)(  (id, dist, newDist) => math.min(dist, newDist), // Vertex Program  triplet => {  // Send Message    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))    } else {      Iterator.empty    }  },  (a,b) => math.min(a,b) // Merge Message  )println(sssp.vertices.collect.mkString("\n"))

上面的例子中,Vertex Program函数定义如下:

+

(id, dist, newDist) => math.min(dist, newDist)

这个函数的定义显而易见,当两个消息来的时候,取它们当中路径的最小值。同理Merge Message函数也是同样的含义。

+

Send Message函数中,会首先比较triplet.srcAttr + triplet.attrtriplet.dstAttr,即比较加上边的属性后,这个值是否小于目的节点的属性,如果小于,则发送消息到目的顶点。

+

3 参考文献

【1】spark源码

+

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Pregel: A System for Large-Scale Graph Processing
干货 | 大数据处理技术的总结与分析
Pregel: A System for Large
AMiner出品|迅速带你入门图计算领域,技术人才趋势产业一应俱全
京东图计算系统JoyGraph
大规模图计算研究
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服