本文共 17272 字,大约阅读时间需要 57 分钟。
免费开通大数据服务:
MaxCompute Graph 是基于飞天平台实现的面向迭代的图处理框架,为用户提供了类似于 Pregel 的编程接口。MaxCompute Graph(以下简称 Graph )作业包含图加载和计算两个阶段:
图的原始数据存在于MaxCompute 的表(table)中,每个 table 包含多个记录(record),每个 record 又包含多个列(field),图加载就是将这种形式的数据,转换成 vertex 和 edge的过程。
我们将图的加载过程实现为一个网络版的MapReduce过程,如下图所示:
下面我们分三个阶段,分别解析图载入的具体实现。
图作业将MaxCompute 中的table作为输入,需要做的第一步就是将输入表中的record分块,以便并行载入。输入的分解与MaxCompute MR相同,都是经过一下步骤:
额外解释一下第三步的切割(或合并),我们知道表或者分区的数据都是存储在一个或多个文件中的,而文件有大有小。在根据指定的字节数 split size划分块时,我们将大于split size的文件称为大文件,小于split size的小文件。
因此,为了保证每个块字节数基本相等,需要将大文件按照split size切分成小块,具体形式就是有三元组(文件名,起始偏移字节,块长度)表示,文件尾剩余的字节数如果过小,会合并到最后一块中。而对于小文件们,则需要将多个小文件作为一个块,具体形式有多个三元组组成【(文件名,0,文件长度),(文件名,0,文件长度)】,最终块的大小也会受块中小文件的个数限制(保证不会因为过多的小文件导致载入时间过长)。至此,我们已经将输入划分成了块,假设块的个数为m,根据不同的作业配置,数据并行载入的方式也不相同。
对于此种情况,worker和split是一对一的关系,worker数由split的个数决定,如果split个数超过1000,则作业会因为worker数超过1000报错。
如果split size比较小,导致split的个数多于设定的worker数,则worker与split的对应关系如下图:
如果split size比较大,导致split的个数少于设定的worker数,则worker与split的对应关系如下图:
因此,用户手工设定 worker 数的情况下,调整split size的大小,有助于提高数据的载入速度。
总结一下,本小节将输入分解成了块,并确定了worker和块之间的关系。不管哪种情况,最终的结果是,一个 worker 会分到零个或多个 split,下面就需要解析一个 worker 是怎么处理 split 的。
如上节所述,本节解释单个 worker 处理零个或多个 split 的执行过程。此阶段的实现需要用户实现 GraphLoader 接口,我们会为每个 split 创建单独的 GraphLoader 实例,这个实例负责将 split 中的 record 解释为 vertex 或者 edge。根据此 worker 分到的 split 的个数,分为三种情况:
从整体看,GraphLoader 实例会读取 split 中的每个 record,用户可根据此 record 中的语义(如来自哪张表,字段的含义等)解析成 vertex 和(或)edge,解析后的结果以键值对的形式存在,(id, vertex)表示为值为 id 的点增加一个 vertex 对象,(id, edge)表示为值为id的点增加一个 edge 对象。请注意,vertex 本身也是可以带有 edge 的。
此阶段,可以为单个 id 多次增加 vertex 或 edge 对象,无须关系是否重复,下个阶段会处理这种情况。此过程将 split 中的多个 record,转换成了多个 vertex 或 edge对象。用户可以将其理解成一个 map 过程,将 record 转换成一些(key, value),key本身会重复。
至此,我们已经将 split 中的 record,都转换成了(id, vertex)和(id, edge)这种形式的键值对,这些键值对是通过用户的 GraphLoader 接口生成的。我们在拿到这些键值对之后,会根据 id 将这些键值对分发到其它的 worker,默认的分发依据是 id 的hash值。同时,每个 worker 在接收到这些键值对后,会根据 id 分组,将相同 id的 vertex 和 edge 组织起来,具体组织形式见下图:
也就是组织成 key -> values 的形式,类似于 reduce 的输入形式,但不同的是,values 会根据 value 的类型进行划分,这里会将一个 id 对应的 values 划分为 vertex 和 edge 集合。
总结一下,本小节将所有 split 解析成了(id, vertex)和(id, edge),并将这些键值对 shuffle 到了指定的 worker,下面就需要解析 worker 是怎么处理这些键值对的。
Graph 中图是以邻接表的形式存储的,因此,内存中的对象组织形式就是,每个点对应一个 vertex 对象,而边作为 vertex 对象的成员变量存在,表示此 vertex 的出边。
因此,我们此阶段要做的工作,就是将从上一节得到的键值对,封装到一个 vertex 对象中,并把此 vertex 对象添加到最终的图中,此 vertex 对象也是计算阶段迭代遍历的vertex对象。 我们用下图表示此阶段的执行过程:此过程,针对每个 id 汇总后的结果,将上一节中针对此 id 的添加的所有 vertex 和 edge 都封装到一个 vertex 对象中。封装的过程是通过 VertexResolver 接口完成的,此接口我们会提供一个默认实现,用户也可以自定义,根据自己作业的情况进行特殊处理。
针对一个 id 添加的 vertex 和 edge,我们提供的默认实现有如下步骤:总结一下,本小节我们将 shuffle 之后的(id, vertex)和(id, edge)键值对,成功的封装成了最终的 vertex 对象,这些 vertex 对象最终也将参与后续的迭代计算。请注意,对于每个 id,都会经过这个阶段,并不是只有冲突的时候才会这样。
经过上一章节的介绍,我们已经对整个图数据加载的原理有了初步的认识,下面我们开始介绍主要的接口,从上一章节得知,用户只需要实现 GraphLoader 和 VertexResolver 接口,而 VertexResolver 接口我们提供了默认实现,因此一般情况下,用户只需要自定义 GraphLoader 接口就可以了。
先看一下 GraphLoader 接口的代码:
public abstract class GraphLoader{ public void setup(Configuration conf, int workerId, TableInfo tableInfo) throws IOException { } public abstract void load(LongWritable recordNum, Record record, MutationContext context) throws IOException;}
首先解释一下四个泛型参数的含义:
我们现在来看 GraphLoader 的两个方法的含义:
public interface MutationContext { public abstract Configuration getConfiguration(); public abstract int getNumWorkers(); public void addVertexRequest(Vertex vertex) throws IOException; public void removeVertexRequest(I vertexId) throws IOException; public abstract void addEdgeRequest(I sourceVertexId, Edge edge) throws IOException; public void removeEdgeRequest(I sourceVertexId, I targetVertexId) throws IOException; public abstract Counter getCounter(Enum name); public abstract Counter getCounter(String group, String name);}
其中 getConfiguration 方法的返回值与 setup 的 Configuration 参数相同,geNumWorkers获取作业总的 worker 的个数,getCounter 是允许用户自定义 counter 来做一些统计,在作业结束时,这些 counter 可以通过 sdk 被获取到。最重要的是剩余的四个方法,其中 removeVertexRequest 和
RemoveEdgeRequest 方法可以暂时忽略,它们的作用是可以根据输入数据,可以选择去删除一些点和边,在数据载入阶段,这种需求并不常见。
我们重点关注addVertexRequest和addEdgeRequest,这两个方法的作用,就是生成我们之前提到的(id, vertex)和(id, edge)。addVertexRequest(Vertex vertex)会生成(id, vertex)的键值对,id 是 vertex 的 id。addEdgeRequest(I sourceVertexId, Edge edge) 会生成(id, edge)的键值对,id是边的起始点,edge包含边的终点和值。,>
总结一下,实现 GraphLoader 的目的,就是根据上下文,在 load 方法中,解释 record,并调用 MutationContext 接口,请求将 vertex 和 edge 添加到图中。
请注意,这里的方法名字都带有Request字样,意思就是说调用这些方法,只代表一种请求,最终这些请求是否生效,取决于 VertexResolver 的实现。
在所有的 split 被 GraphLoader 处理结束后,我们会做一次同步,使得所有 worker 统一开始执行 VertexResolver,我们先看一下 VertexResolver的方法:
public abstract class VertexResolver { public void configure(Configuration conf) throws IOException { } public abstract Vertex resolve(I vertexId, Vertex vertex, VertexChanges vertexChanges, boolean hasMessages) throws IOException;}
每个 worker 上只会创建一个 VertexResolver的实例,这个实例负责处理属于此 worker 的所有点,即将关于一个 id 的 vertex 和 edge 封装成一个 vertex 对象。
public interface VertexChanges { public List> getAddedVertexList(); public int getRemovedVertexCount(); public List > getAddedEdgeList(); public List getRemovedEdgeList();}
看这个四个方法,是不是和 MutationContext 中的方法相似?是的, MutationContext中的add/remove*Reuqest方法分别与这里的get*List对应,也解释了上一小节所说的。MutationContext 的调用都是些请求,并不一定是最终的 vertex 和 edge,因为它们要在这里进行经过处理才能决定是否真正要添加到最终的图中。
总结一下,VertexResolver 是数据加载的最后一步,它的作用就是构造最终的图结构,构造过程就是将 GraphLoader 里生成的键值对进行封装,使得最终的图结构以一种邻接表的形式存在。
我们举两种类型的示例说明图的加载过程。输入数据一般可以划分成边类型和点类型的数据,我们分开举例说明。
边类型的数据可以用下面的表格表示:
SourceVertexID | DestinationVertexID | EdgeValue |
---|---|---|
id0 | id1 | 9 |
id0 | id2 | 5 |
id2 | id1 | 4 |
每条Record表示图中的一条边的格式,上图表示id0有两条出边,分别指向id1和id2;id2有一条出边,指向id1;id1没有出边。
对应的 GraphLoader 的实现为:
/** * 将Record解释为Edge,此处对于上述第一种情况,每个Record只表示一条Edge。 ** 类似于{
@link com.aliyun.odps.mapreduce.Mapper#map} * ,输入Record,生成键值对,此处的键是Vertex的ID, * 值是Edge,通过上下文Context写出,这些键值对会在LoadingVertexResolver出根据Vertex的ID汇总。 * * 注意:此处添加的点或边只是根据Record内容发出的请求,并不是最后参与计算的点或边,只有在随后的{ @link VertexResolver} * 中添加的点或边才参与计算。 */ public static class EdgeInputLoader extends GraphLoader{ /** * 配置EdgeInputLoader。 * * @param conf * 作业的配置参数,在main中使用GraphJob配置的,或者在console中set的 * @param workerId * 当前工作的worker的序号,从0开始,可以用于构造唯一的vertex id * @param inputTableInfo * 当前worker载入的输入表信息,可以用于确定当前输入是哪种类型的数据,即Record的格式 */ @Override public void setup(Configuration conf, int workerId, TableInfo inputTableInfo) { } /** * 根据Record中的内容,解析为对应的边,并请求添加到图中。 * * @param recordNum * 记录序列号,从1开始,每个worker上单独计数 * @param record * 输入表中的记录,三列,分别表示初点、终点、边的权重 * @param context * 上下文,请求将解释后的边添加到图中 */ @Override public void load( LongWritable recordNum, Record record, MutationContext context) throws IOException { /** * 1、第一列表示初始点的ID */ LongWritable sourceVertexID = (LongWritable) record.get(0); /** * 2、第二列表示终点的ID */ LongWritable destinationVertexID = (LongWritable) record.get(1); /** * 3、地三列表示边的权重 */ LongWritable edgeValue = (LongWritable) record.get(2); /** * 4、创建边,由终点ID和边的权重组成 */ Edge edge = new Edge ( destinationVertexID, edgeValue); /** * 5、请求给初始点添加边 */ context.addEdgeRequest(sourceVertexID, edge); /** * 6、如果每条Record表示双向边,重复4与5 Edge edge2 = new * Edge ( sourceVertexID, edgeValue); * context.addEdgeRequest(destinationVertexID, edge2); */ } }
对应的 VertexResolver的实现为:
/** * 汇总{ @link GraphLoader#load(LongWritable, Record, MutationContext)}生成的键值对,类似于 * { @link com.aliyun.odps.mapreduce.Reducer#reduce}。对于唯一的Vertex ID,所有关于这个ID上 * 添加\删除、点\边的行为都会放在{ @link VertexChanges}中。 * * 注意:此处并不只针对load方法中添加的有冲突的点或边才调用(冲突是指添加多个相同Vertex对象,添加重复边等), * 所有在load方法中请求生成的ID都会在此处被调用。 */ public static class EdgeResolver extends VertexResolver{ /** * 处理关于一个ID的添加或删除、点或边的请求。 * * * {
@link VertexChanges}有四个接口,分别与{ @link MutationContext}的四个接口对应: **
- { @link VertexChanges#getAddedVertexList()}与 * { @link MutationContext#addVertexRequest(Vertex)}对应, * 在load方法中,请求添加的ID相同的Vertex对象,会被汇总在返回的List中 *
- { @link VertexChanges#getAddedEdgeList()}与 * { @link MutationContext#addEdgeRequest(WritableComparable, Edge)} * 对应,请求添加的初始点ID相同的Edge对象,会被汇总在返回的List中 *
- { @link VertexChanges#getRemovedVertexCount()}与 * { @link MutationContext#removeVertexRequest(WritableComparable)} * 对应,请求删除的ID相同的Vertex,汇总的请求删除的次数作为返回值 *
- { @link VertexChanges#getRemovedEdgeList()}与 * { @link MutationContext#removeEdgeRequest(WritableComparable, WritableComparable)} * 对应,请求删除的初始点ID相同的Edge对象,会被汇总在返回的List中 *
* *
* 用户通过处理关于这个ID的变化,通过返回值声明此ID是否参与计算,如果返回的{
@link Vertex}不为null, * 则此ID会参与随后的计算,如果返回null,则不会参与计算。 * * @param vertexId * 请求添加的点的ID,或请求添加的边的初点ID * @param vertex * 已存在的Vertex对象,数据载入阶段,始终为null * @param vertexChanges * 此ID上的请求添加\删除、点\边的集合 * @param hasMessages * 此ID是否有输入消息,数据载入阶段,始终为false */ @Override public Vertexresolve( LongWritable vertexId, Vertex vertex, VertexChanges vertexChanges, boolean hasMessages) throws IOException { /** * 1、生成Vertex对象,作为参与计算的点。 */ EdgeVertex computeVertex = new EdgeVertex(); computeVertex.setId(vertexId); /** * 2、将请求给此点添加的边,添加到Vertex对象中,如果数据有重复的可能,根据算法需要决定是否去重。 */ if (vertexChanges.getAddedEdgeList() != null) { for (Edge edge : vertexChanges .getAddedEdgeList()) { computeVertex.addEdge(edge.getDestVertexId(), edge.getValue()); } } /** * 3、将Vertex对象返回,添加到最终的图中参与计算。 */ return computeVertex; } }
我们分析一个多路输入的例子。Graph作业指定两张表作为输入,一张是边类型的数据,格式如1中所示,另一张是点类型的数据,格式可以用下面的表格表示:
VertexID | VertexValue |
---|---|
id0 | 9 |
id1 | 7 |
id2 | 8 |
表示有三个点,id分别是id0,id1,id2,对应的点的值分别是9,7,8。
对应的 GraphLoader 的实现为:
/** * 将Record解释为Vertex和Edge,每个Record根据其来源,表示一个Vertex或者一条Edge。 ** 类似于{
@link com.aliyun.odps.mapreduce.Mapper#map} * ,输入Record,生成键值对,此处的键是Vertex的ID, * 值是Vertex或Edge,通过上下文Context写出,这些键值对会在LoadingVertexResolver出根据Vertex的ID汇总。 * * 注意:此处添加的点或边只是根据Record内容发出的请求,并不是最后参与计算的点或边,只有在随后的{ @link VertexResolver} * 中添加的点或边才参与计算。 */ public static class VertexInputLoader extends GraphLoader{ private boolean isEdgeData; /** * 配置VertexInputLoader。 * * @param conf * 作业的配置参数,在main中使用GraphJob配置的,或者在console中set的 * @param workerId * 当前工作的worker的序号,从0开始,可以用于构造唯一的vertex id * @param inputTableInfo * 当前worker载入的输入表信息,可以用于确定当前输入是哪种类型的数据,即Record的格式 */ @Override public void setup(Configuration conf, int workerId, TableInfo inputTableInfo) { isEdgeData = conf.get(EDGE_TABLE).equals(inputTableInfo.getTableName()); } /** * 根据Record中的内容,解析为对应的边,并请求添加到图中。 * * @param recordNum * 记录序列号,从1开始,每个worker上单独计数 * @param record * 输入表中的记录,三列,分别表示初点、终点、边的权重 * @param context * 上下文,请求将解释后的边添加到图中 */ @Override public void load( LongWritable recordNum, Record record, MutationContext context) throws IOException { if (isEdgeData) { /** * 数据来源于存储边信息的表。 * * 1、第一列表示初始点的ID */ LongWritable sourceVertexID = (LongWritable) record.get(0); /** * 2、第二列表示终点的ID */ LongWritable destinationVertexID = (LongWritable) record.get(1); /** * 3、地三列表示边的权重 */ LongWritable edgeValue = (LongWritable) record.get(2); /** * 4、创建边,由终点ID和边的权重组成 */ Edge edge = new Edge ( destinationVertexID, edgeValue); /** * 5、请求给初始点添加边 */ context.addEdgeRequest(sourceVertexID, edge); /** * 6、如果每条Record表示双向边,重复4与5 Edge edge2 = new * Edge ( sourceVertexID, edgeValue); * context.addEdgeRequest(destinationVertexID, edge2); */ } else { /** * 数据来源于存储点信息的表。 * * 1、第一列表示点的ID */ LongWritable vertexID = (LongWritable) record.get(0); /** * 2、第二列表示点的值 */ LongWritable vertexValue = (LongWritable) record.get(1); /** * 3、创建点,由点的ID和点的值组成 */ MyVertex vertex = new MyVertex(); /** * 4、初始化点 */ vertex.setId(vertexID); vertex.setValue(vertexValue); /** * 5、请求添加点 */ context.addVertexRequest(vertex); } } }
对应的 VertexResolver 的实现为:
/** * 汇总{ @link GraphLoader#load(LongWritable, Record, MutationContext)}生成的键值对,类似于 * { @link com.aliyun.odps.mapreduce.Reducer#reduce}。对于唯一的Vertex ID,所有关于这个ID上 * 添加\删除、点\边的行为都会放在{ @link VertexChanges}中。 * * 注意:此处并不只针对load方法中添加的有冲突的点或边才调用(冲突是指添加多个相同Vertex对象,添加重复边等), * 所有在load方法中请求生成的ID都会在此处被调用。 */ public static class LoadingResolver extends VertexResolver{ /** * 处理关于一个ID的添加或删除、点或边的请求。 * * * {
@link VertexChanges}有四个接口,分别与{ @link MutationContext}的四个接口对应: **
- { @link VertexChanges#getAddedVertexList()}与 * { @link MutationContext#addVertexRequest(Vertex)}对应, * 在load方法中,请求添加的ID相同的Vertex对象,会被汇总在返回的List中 *
- { @link VertexChanges#getAddedEdgeList()}与 * { @link MutationContext#addEdgeRequest(WritableComparable, Edge)} * 对应,请求添加的初始点ID相同的Edge对象,会被汇总在返回的List中 *
- { @link VertexChanges#getRemovedVertexCount()}与 * { @link MutationContext#removeVertexRequest(WritableComparable)} * 对应,请求删除的ID相同的Vertex,汇总的请求删除的次数作为返回值 *
- { @link VertexChanges#getRemovedEdgeList()}与 * { @link MutationContext#removeEdgeRequest(WritableComparable, WritableComparable)} * 对应,请求删除的初始点ID相同的Edge对象,会被汇总在返回的List中 *
* *
* 用户通过处理关于这个ID的变化,通过返回值声明此ID是否参与计算,如果返回的{
@link Vertex}不为null, * 则此ID会参与随后的计算,如果返回null,则不会参与计算。 * * @param vertexId * 请求添加的点的ID,或请求添加的边的初点ID * @param vertex * 已存在的Vertex对象,数据载入阶段,始终为null * @param vertexChanges * 此ID上的请求添加\删除、点\边的集合 * @param hasMessages * 此ID是否有输入消息,数据载入阶段,始终为false */ @Override public Vertexresolve( LongWritable vertexId, Vertex vertex, VertexChanges vertexChanges, boolean hasMessages) throws IOException { /** * 1、获取Vertex对象,作为参与计算的点。 */ MyVertex computeVertex = null; if (vertexChanges.getAddedVertexList() == null || vertexChanges.getAddedVertexList().isEmpty()) { computeVertex = new MyVertex(); computeVertex.setId(vertexId); } else { /** * 此处假设存储点信息的表中,每个Record表示唯一的点。 */ computeVertex = (MyVertex) vertexChanges.getAddedVertexList().get(0); } /** * 2、将请求给此点添加的边,添加到Vertex对象中,如果数据有重复的可能,根据算法需要决定是否去重。 */ if (vertexChanges.getAddedEdgeList() != null) { for (Edge edge : vertexChanges .getAddedEdgeList()) { computeVertex.addEdge(edge.getDestVertexId(), edge.getValue()); } } /** * 3、将Vertex对象返回,添加到最终的图中参与计算。 */ return computeVertex; } }
转载地址:http://tgvix.baihongyu.com/