千家信息网

Giraph源码分析(七)—— 添加消息统计功能

发表于:2024-11-11 作者:千家信息网编辑
千家信息网最后更新 2024年11月11日,作者|白松1、 添加类,把每个超步发送的消息量大小写入Hadoop的Counter中。在org.apache.giraph.counters包下新建GiraphMessages类,来统计消息量。源代码
千家信息网最后更新 2024年11月11日Giraph源码分析(七)—— 添加消息统计功能

作者|白松

1、 添加类,把每个超步发送的消息量大小写入Hadoop的Counter中。在org.apache.giraph.counters包下新建GiraphMessages类,来统计消息量。

源代码如下:

package org.apache.giraph.counters;import java.util.Iterator;import java.util.Map;import org.apache.hadoop.mapreduce.Mapper.Context;import com.google.common.collect.Maps;/** * Hadoop Counters in group "Giraph Messages" for counting every superstep * message count. */public class GiraphMessages extends HadoopCountersBase {    /** Counter group name for the giraph Messages */    public static final String GROUP_NAME = "Giraph Messages";    /** Singleton instance for everyone to use */    private static GiraphMessages INSTANCE;    /** superstep time in msec */    private final Map superstepMessages;    private GiraphMessages(Context context) {        super(context, GROUP_NAME);        superstepMessages = Maps.newHashMap();    }    /**     * Instantiate with Hadoop Context.     *      * @param context     *            Hadoop Context to use.     */    public static void init(Context context) {        INSTANCE = new GiraphMessages(context);    }    /**     * Get singleton instance.     *      * @return singleton GiraphTimers instance.     */    public static GiraphMessages getInstance() {        return INSTANCE;    }    /**     * Get counter for superstep messages     *      * @param superstep     * @return     */    public GiraphHadoopCounter getSuperstepMessages(long superstep) {        GiraphHadoopCounter counter = superstepMessages.get(superstep);        if (counter == null) {            String counterPrefix = "Superstep- " + superstep+" ";            counter = getCounter(counterPrefix);            superstepMessages.put(superstep, counter);        }        return counter;    }    @Override    public Iterator iterator() {        return superstepMessages.values().iterator();    }}

2、在BspServiceMaster类中添加统计功能。Master在每次同步时候,会聚集每个Worker发送的消息量大小(求和),存储于GlobalStats中。因此只需要在每次同步后,从GlobalStats对象中取出总的通信量大小,然后写入GiraphMessages中。格式为,实际存储于上步GiraphMessages类中定义的Map superstepMessages 对象中。 在BspServiceMaster的构造方法中,最后面追加一行代码,对GiraphMessages进行初始化。

GiraphMessages.init(context);

在BspServiceMaster类的SuperstepState coordinateSuperstep()方法中,添加记录功能。片段代码如下:

……// If the master is halted or all the vertices voted to halt and there// are no more messages in the system, stop the computationGlobalStats globalStats = aggregateWorkerStats(getSuperstep());  LOG.info("D-globalStats: "+globalStats+"\n\n");//添加下面语句。从第0个超步起开始记录。if(getSuperstep() != INPUT_SUPERSTEP) {    GiraphMessages.getInstance().getSuperstepMessages(getSuperstep()).increment(globalStats.getMessageCount());}……

3、实验结果如下:

完!

0