GraphFrames 用户指南 - Scala

本文演示 GraphFrames 用户指南中的示例。

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._

创建 GraphFrame

可以从顶点和边缘数据帧创建 GraphFrame。

  • 顶点数据帧:顶点数据帧应包含一个名为 id 的特殊列,该列指定图形中每个顶点的唯一 ID。
  • 边缘数据帧:边缘数据帧应包含两个特殊列: src (边缘的源顶点 ID)和 dst (边缘的目标顶点 ID)。

这两个数据帧可以具有任意其他列。 这些列可以表示顶点和边缘属性。

创建顶点和边缘

// Vertex DataFrame
val v = spark.createDataFrame(List(
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
)).toDF("src", "dst", "relationship")

让我们用这些顶点和这些边创建一个图:

val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends

基本图形和数据帧查询

GraphFrame 提供简单的图形查询,例如节点度。

此外,由于 GraphFrames 将图形表示为顶点和边缘数据帧对,因此可以直接在顶点和边缘 DataFrame 上执行功能强大的查询。 这些数据帧可用作 GraphFrame 中的顶点和边缘字段。

display(g.vertices)
display(g.edges)

顶点的传入度:

display(g.inDegrees)

顶点的传出度:

display(g.outDegrees)

顶点的度:

display(g.degrees)

可以直接在顶点数据帧上运行查询。 例如,我们可以在图中找到最年轻的年龄:

val youngest = g.vertices.groupBy().min("age")
display(youngest)

同样,可以在边缘数据帧上运行查询。 例如,让我们计算一下图形中的“关注”关系的数目:

val numFollows = g.edges.filter("relationship = 'follow'").count()

装饰图形查找结果

使用图案构建涉及边缘和顶点的更复杂的关系。 下面的单元将查找其间的两个方向上都有边的顶点对。 结果是一个数据帧,其中列名称是图案键。

有关 API 的更多详细信息,请查看 GraphFrame 用户指南

// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)

由于结果是一个数据帧,因此你可以在装饰图形的基础上构建更复杂的查询。 让我们找出一个人年龄大于 30 岁的所有相互关系:

val filtered = motifs.filter("b.age > 30")
display(filtered)

有状态查询

大多数图案查询都是无状态的,易于表达,如上面的示例所示。 接下来的几个示例演示了更复杂的查询,这些查询沿着模式中的路径承载状态信息。 通过将图形帧装饰图形查找结果与针对结果的筛选器组合使用来表示这些查询,其中,筛选器使用序列运算来构造一系列的数据帧列。

例如,假设你想通过一系列函数来确定一个具有某种特性的4个顶点链。 也就是说,在 4 个顶点 a->b->c->d的链中,标识匹配此复杂筛选器的链子集:

  • 在路径上初始化状态。
  • 基于顶点 a 更新状态。
  • 基于顶点 b 更新状态。
  • 等,适用于 c 和 d。
  • 如果最终状态与某些条件匹配,则筛选器接受链。

以下代码片段演示了此过程,其中我们识别了 4 个顶点的链条,以便至少 3 条边中的 2 条是“好友”关系。 在此示例中,状态是“朋友”边的当前计数;一般情况下,它可以是任何数据帧列。

// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

// Query on sequence, with state (cnt)
//  (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
  when(relationship === "friend", cnt + 1).otherwise(cnt)
}
//  (b) Use sequence operation to apply method to sequence of elements in motif.
//      In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
  foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
//  (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)

子图形

GraphFrames 通过筛选边缘和顶点来提供用于生成子图的 API。 这些筛选器可以组合在一起。 例如,以下子图仅包含朋友且年龄超过 30 岁的人。

// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
  .filterEdges("relationship = 'friend'")
  .filterVertices("age > 30")
  .dropIsolatedVertices()

复杂的三重筛选器

以下示例演示如何基于三元筛选器选择子图,这些筛选器在边缘及其“src”和“dst”顶点上运行。 使用更复杂的装饰图形将此示例扩展为超过三元组的操作非常简单。

// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
  .filter("e.relationship = 'follow'")
  .filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
//  val e2 = paths.select("e.*")

// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)

标准图形算法

本部分介绍 GraphFrames 中内置的标准图形算法。

广度优先搜索 (BFS)

从“Esther”中搜索年龄 < 32 的用户。

val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)

搜索还可以限制边缘筛选器和最大路径长度。

val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
  .edgeFilter("relationship != 'friend'")
  .maxPathLength(3)
  .run()
display(filteredPaths)

连接的组件

计算每个顶点的连接组件成员身份,并返回一个图,其中每个顶点都分配了组件 ID。

val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)

强连接的组件

计算每个顶点的强连通分量(SCC),并返回一个图,其中每个顶点都被分配到其所属的强连通分量。

val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))

标签传播

运行静态标签传播算法,用于检测网络中的社区。

网络中的每个节点最初都分配给自己的社区。 在每一个超级步骤中,节点向所有邻居发送其社区隶属关系,并将其状态更新为传入消息的模式社区隶属关系。

LPA 是图形的标准社区检测算法。 这在计算方面成本较低,但具有以下特点:(1) 不能保证收敛;(2) 最终可能会得到一些微不足道的解决方案(所有节点都标识为一个社区)。

val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))

PageRank

根据连接识别图形中的重要顶点。

// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)

最短路径

计算给定一组地标顶点的最短路径,其中地标由顶点 ID 指定。

val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)

三角形计数

计算通过每个顶点的三角形的数量。

import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends  // get example graph

val results = g.triangleCount.run()
results.select("id", "count").show()