
Giraph源码分析(八)—— 统计每个SuperStep中参与计算的顶点数目

作者|白松目的:科研中,需要分析在每次迭代过程中参与计算的顶点数目,来进一步优化系统。比如,在SSSP的compute()方法最后一行,都会把当前顶点voteToHalt,即变为InActive状态。
Giraph源码分析(八)—— 统计每个SuperStep中参与计算的顶点数目



@Override  public void compute(Iterable messages) {    if (getSuperstep() == 0) {      setValue(new DoubleWritable(Double.MAX_VALUE));    }    double minDist = isSource() ? 0d : Double.MAX_VALUE;    for (DoubleWritable message : messages) {      minDist = Math.min(minDist, message.get());    }    if (minDist < getValue().get()) {      setValue(new DoubleWritable(minDist));      for (Edge edge : getEdges()) {        double distance = minDist + edge.getValue().get();        sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));      }    }    //把顶点置为InActive状态    voteToHalt();  }




  1. org.apache.giraph.partition. PartitionStats 类


/** computed vertices in this partition */private long computedVertexCount=0;/*** Increment the computed vertex count by one.*/public void incrComputedVertexCount() {    ++ computedVertexCount;}/** * @return the computedVertexCount */public long getComputedVertexCount() {    return computedVertexCount;}


@Overridepublic void readFields(DataInput input) throws IOException {    partitionId = input.readInt();    vertexCount = input.readLong();    finishedVertexCount = input.readLong();    edgeCount = input.readLong();    messagesSentCount = input.readLong();    //添加下条语句    computedVertexCount=input.readLong();}@Overridepublic void write(DataOutput output) throws IOException {    output.writeInt(partitionId);    output.writeLong(vertexCount);    output.writeLong(finishedVertexCount);    output.writeLong(edgeCount);    output.writeLong(messagesSentCount);    //添加下条语句    output.writeLong(computedVertexCount);}
  1. org.apache.giraph.graph. GlobalStats 类


 /** computed vertices in this partition   *  Add by BaiSong   */  private long computedVertexCount=0;     /**     * @return the computedVertexCount     */    public long getComputedVertexCount() {        return computedVertexCount;    }

修改addPartitionStats(PartitionStats partitionStats)方法,增加统计computedVertexCount功能。

/**  * Add the stats of a partition to the global stats.  *  * @param partitionStats Partition stats to be added.  */  public void addPartitionStats(PartitionStats partitionStats) {    this.vertexCount += partitionStats.getVertexCount();    this.finishedVertexCount += partitionStats.getFinishedVertexCount();    this.edgeCount += partitionStats.getEdgeCount();    //Add by BaiSong,添加下条语句    this.computedVertexCount+=partitionStats.getComputedVertexCount(); }


public String toString() {        return "(vtx=" + vertexCount + ", computedVertexCount="                + computedVertexCount + ",finVtx=" + finishedVertexCount                + ",edges=" + edgeCount + ",msgCount=" + messageCount                + ",haltComputation=" + haltComputation + ")";    }
  1. org.apache.giraph.graph. ComputeCallable


if (!vertex.isHalted()) {        context.progress();        TimerContext computeOneTimerContext = computeOneTimer.time();        try {            vertex.compute(messages);        //添加下面一句,当顶点调用完compute()方法后,就把该Partition的computedVertexCount加1            partitionStats.incrComputedVertexCount();        } finally {           computeOneTimerContext.stop();        }……
  1. 添加Counters统计,和我的博客Giraph源码分析(七)-- 添加消息统计功能 类似,此处不再详述。添加的类为:org.apache.giraph.counters.GiraphComputedVertex,下面附上该类的源码:
package org.apache.giraph.counters;import java.util.Iterator;import java.util.Map;import org.apache.hadoop.mapreduce.Mapper.Context;import com.google.common.collect.Maps;/** * Hadoop Counters in group "Giraph Messages" for counting every superstep * message count. */public class GiraphComputedVertex extends HadoopCountersBase {    /** Counter group name for the giraph Messages */    public static final String GROUP_NAME = "Giraph Computed Vertex";    /** Singleton instance for everyone to use */    private static GiraphComputedVertex INSTANCE;    /** superstep time in msec */    private final Map superstepVertexCount;    private GiraphComputedVertex(Context context) {        super(context, GROUP_NAME);        superstepVertexCount = Maps.newHashMap();    }    /**     * Instantiate with Hadoop Context.     *      * @param context     *            Hadoop Context to use.     */    public static void init(Context context) {        INSTANCE = new GiraphComputedVertex(context);    }    /**     * Get singleton instance.     *      * @return singleton GiraphTimers instance.     */    public static GiraphComputedVertex getInstance() {        return INSTANCE;    }    /**     * Get counter for superstep messages     *      * @param superstep     * @return     */    public GiraphHadoopCounter getSuperstepVertexCount(long superstep) {        GiraphHadoopCounter counter = superstepVertexCount.get(superstep);        if (counter == null) {            String counterPrefix = "Superstep: " + superstep+" ";            counter = getCounter(counterPrefix);            superstepVertexCount.put(superstep, counter);        }        return counter;    }    @Override    public Iterator iterator() {        return superstepVertexCount.values().iterator();    }}
  1. 实验结果,运行程序后。会在终端输出每次迭代参与计算的顶点总数目。 测试SSSP(SimpleShortestPathsVertex类),输入图中共有9个顶点和12条边。输出结果如下:


