Giraph源码分析(八)—— 统计每个SuperStep中参与计算的顶点数目
作者|白松
目的:科研中,需要分析在每次迭代过程中参与计算的顶点数目,来进一步优化系统。比如,在SSSP的compute()方法最后一行,都会把当前顶点voteToHalt,即变为InActive状态。所以每次迭代完成后,所有顶点都是InActive状态。在大同步后,收到消息的顶点会被激活,变为Active状态,然后调用顶点的compute()方法。本文的目的就是统计每次迭代过程中,参与计算的顶点数目。下面附上SSSP的compute()方法:
@Override public void compute(Iterable messages) { if (getSuperstep() == 0) { setValue(new DoubleWritable(Double.MAX_VALUE)); } double minDist = isSource() ? 0d : Double.MAX_VALUE; for (DoubleWritable message : messages) { minDist = Math.min(minDist, message.get()); } if (minDist < getValue().get()) { setValue(new DoubleWritable(minDist)); for (Edge edge : getEdges()) { double distance = minDist + edge.getValue().get(); sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance)); } } //把顶点置为InActive状态 voteToHalt(); }
附:giraph中算法的终止条件是:没有活跃顶点且worker间没有消息传递。
hama-0.6.0中算法的终止条件只是:判断是否有活跃顶点。不是真正的pregel思想,半成品。
修改过程如下:
- org.apache.giraph.partition. PartitionStats 类
添加变量和方法,用来统计每个Partition在每个超步中参与计算的顶点数目。添加的变量和方法如下:
/** computed vertices in this partition */private long computedVertexCount=0;/*** Increment the computed vertex count by one.*/public void incrComputedVertexCount() { ++ computedVertexCount;}/** * @return the computedVertexCount */public long getComputedVertexCount() { return computedVertexCount;}
修改readFields()和write()方法,每个方法追加最后一句。当每个Partition计算完成后,会把自己的computedVertexCount发送给Master,Mater再读取汇总。
@Overridepublic void readFields(DataInput input) throws IOException { partitionId = input.readInt(); vertexCount = input.readLong(); finishedVertexCount = input.readLong(); edgeCount = input.readLong(); messagesSentCount = input.readLong(); //添加下条语句 computedVertexCount=input.readLong();}@Overridepublic void write(DataOutput output) throws IOException { output.writeInt(partitionId); output.writeLong(vertexCount); output.writeLong(finishedVertexCount); output.writeLong(edgeCount); output.writeLong(messagesSentCount); //添加下条语句 output.writeLong(computedVertexCount);}
org.apache.giraph.graph. GlobalStats 类
添加变量和方法,用来统计每个超步中参与计算的顶点总数目,包含每个Worker上的所有Partitions。
/** computed vertices in this partition * Add by BaiSong */ private long computedVertexCount=0; /** * @return the computedVertexCount */ public long getComputedVertexCount() { return computedVertexCount; }
修改addPartitionStats(PartitionStats partitionStats)方法,增加统计computedVertexCount功能。
/** * Add the stats of a partition to the global stats. * * @param partitionStats Partition stats to be added. */ public void addPartitionStats(PartitionStats partitionStats) { this.vertexCount += partitionStats.getVertexCount(); this.finishedVertexCount += partitionStats.getFinishedVertexCount(); this.edgeCount += partitionStats.getEdgeCount(); //Add by BaiSong,添加下条语句 this.computedVertexCount+=partitionStats.getComputedVertexCount(); }
当然为了Debug方便,也可以修改该类的toString()方法(可选),修改后的如下:
public String toString() { return "(vtx=" + vertexCount + ", computedVertexCount=" + computedVertexCount + ",finVtx=" + finishedVertexCount + ",edges=" + edgeCount + ",msgCount=" + messageCount + ",haltComputation=" + haltComputation + ")"; }
- org.apache.giraph.graph. ComputeCallable
添加统计功能。在computePartition()方法中,添加下面一句。
if (!vertex.isHalted()) { context.progress(); TimerContext computeOneTimerContext = computeOneTimer.time(); try { vertex.compute(messages); //添加下面一句,当顶点调用完compute()方法后,就把该Partition的computedVertexCount加1 partitionStats.incrComputedVertexCount(); } finally { computeOneTimerContext.stop(); }……
- 添加Counters统计,和我的博客Giraph源码分析(七)-- 添加消息统计功能 类似,此处不再详述。添加的类为:org.apache.giraph.counters.GiraphComputedVertex,下面附上该类的源码:
package org.apache.giraph.counters;import java.util.Iterator;import java.util.Map;import org.apache.hadoop.mapreduce.Mapper.Context;import com.google.common.collect.Maps;/** * Hadoop Counters in group "Giraph Messages" for counting every superstep * message count. */public class GiraphComputedVertex extends HadoopCountersBase { /** Counter group name for the giraph Messages */ public static final String GROUP_NAME = "Giraph Computed Vertex"; /** Singleton instance for everyone to use */ private static GiraphComputedVertex INSTANCE; /** superstep time in msec */ private final Map superstepVertexCount; private GiraphComputedVertex(Context context) { super(context, GROUP_NAME); superstepVertexCount = Maps.newHashMap(); } /** * Instantiate with Hadoop Context. * * @param context * Hadoop Context to use. */ public static void init(Context context) { INSTANCE = new GiraphComputedVertex(context); } /** * Get singleton instance. * * @return singleton GiraphTimers instance. */ public static GiraphComputedVertex getInstance() { return INSTANCE; } /** * Get counter for superstep messages * * @param superstep * @return */ public GiraphHadoopCounter getSuperstepVertexCount(long superstep) { GiraphHadoopCounter counter = superstepVertexCount.get(superstep); if (counter == null) { String counterPrefix = "Superstep: " + superstep+" "; counter = getCounter(counterPrefix); superstepVertexCount.put(superstep, counter); } return counter; } @Override public Iterator iterator() { return superstepVertexCount.values().iterator(); }}
- 实验结果,运行程序后。会在终端输出每次迭代参与计算的顶点总数目。 测试SSSP(SimpleShortestPathsVertex类),输入图中共有9个顶点和12条边。输出结果如下:
上图测试中,共有6次迭代。红色框中,显示出了每次迭代过冲参与计算的顶点数目,依次是:9,4,4,3,4,0
解释:在第0个超步,每个顶点都是活跃的,所有共有9个顶点参与计算。在第5个超步,共有0个顶点参与计算,那么就不会向外发送消息,加上每个顶点都是不活跃的,所以算法迭代终止。
【阅读更多文章请访问数澜社区】