千家信息网

Storm中怎么使用Direct Grouping分组策略

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,这篇文章主要介绍"Storm中怎么使用Direct Grouping分组策略",在日常操作中,相信很多人在Storm中怎么使用Direct Grouping分组策略问题上存在疑惑,小编查阅了各式资料,
千家信息网最后更新 2025年02月03日Storm中怎么使用Direct Grouping分组策略

这篇文章主要介绍"Storm中怎么使用Direct Grouping分组策略",在日常操作中,相信很多人在Storm中怎么使用Direct Grouping分组策略问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Storm中怎么使用Direct Grouping分组策略"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

使用 Direct Grouping 分组策略,将首字母相同的单词发送给同一个task计数
数据源spout

package com.zhch.v3;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import java.io.BufferedReader;import java.io.FileReader;import java.util.Map;import java.util.UUID;import java.util.concurrent.ConcurrentHashMap;public class SentenceSpout extends BaseRichSpout {    private FileReader fileReader = null;    private boolean completed = false;    private ConcurrentHashMap pending;    private SpoutOutputCollector collector;    @Override    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {        outputFieldsDeclarer.declare(new Fields("sentence"));    }    @Override    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {        this.collector = spoutOutputCollector;        this.pending = new ConcurrentHashMap();        try {            this.fileReader = new FileReader(map.get("wordsFile").toString());        } catch (Exception e) {            throw new RuntimeException("Error reading file [" + map.get("wordsFile") + "]");        }    }    @Override    public void nextTuple() {        if (completed) {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {            }        }        String line;        BufferedReader reader = new BufferedReader(fileReader);        try {            while ((line = reader.readLine()) != null) {                Values values = new Values(line);                UUID msgId = UUID.randomUUID();                this.pending.put(msgId, values);                this.collector.emit(values, msgId);            }        } catch (Exception e) {            throw new RuntimeException("Error reading tuple", e);        } finally {            completed = true;        }    }    @Override    public void ack(Object msgId) {        this.pending.remove(msgId);    }    @Override    public void fail(Object msgId) {        this.collector.emit(this.pending.get(msgId), msgId);    }}

实现语句分割bolt

package com.zhch.v3;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.List;import java.util.Map;public class SplitSentenceBolt extends BaseRichBolt {    private OutputCollector collector;    private List numCounterTasks;    @Override    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {        this.collector = outputCollector;        //获取下游bolt的taskId列表        this.numCounterTasks = topologyContext.getComponentTasks(WordCountTopology.COUNT_BOLT_ID);    }    @Override    public void execute(Tuple tuple) {        String sentence = tuple.getStringByField("sentence");        String[] words = sentence.split(" ");        for (String word : words) {            Integer taskId = this.numCounterTasks.get(this.getWordCountIndex(word));            collector.emitDirect(taskId, tuple, new Values(word));        }        this.collector.ack(tuple);    }    public Integer getWordCountIndex(String word) {        word = word.trim().toUpperCase();        if (word.isEmpty())            return 0;        else {            //单词首字母对下游 bolt taskId 列表长度取余            return word.charAt(0) % numCounterTasks.size();        }    }    @Override    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {        outputFieldsDeclarer.declare(new Fields("word"));    }}

实现单词计数bolt

package com.zhch.v3;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import java.io.BufferedWriter;import java.io.FileWriter;import java.util.HashMap;import java.util.Iterator;import java.util.Map;public class WordCountBolt extends BaseRichBolt {    private OutputCollector collector;    private HashMap counts = null;    @Override    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {        this.collector = outputCollector;        this.counts = new HashMap();    }    @Override    public void execute(Tuple tuple) {        String word = tuple.getStringByField("word");        Long count = this.counts.get(word);        if (count == null) {            count = 0L;        }        count++;        this.counts.put(word, count);        BufferedWriter writer = null;        try {            writer = new BufferedWriter(new FileWriter("/home/grid/stormData/result.txt"));            Iterator keys = this.counts.keySet().iterator();            while (keys.hasNext()) {                String w = keys.next();                Long c = this.counts.get(w);                writer.write(w + " : " + c);                writer.newLine();                writer.flush();            }        } catch (Exception e) {            e.printStackTrace();        } finally {            if (writer != null) {                try {                    writer.close();                } catch (Exception e) {                    e.printStackTrace();                }                writer = null;            }        }        this.collector.ack(tuple);    }    @Override    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {        outputFieldsDeclarer.declare(new Fields("word", "count"));    }}

实现单词计数topology

package com.zhch.v3;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.topology.TopologyBuilder;public class WordCountTopology {    public static final String SENTENCE_SPOUT_ID = "sentence-spout";    public static final String SPLIT_BOLT_ID = "split-bolt";    public static final String COUNT_BOLT_ID = "count-bolt";    public static final String TOPOLOGY_NAME = "word-count-topology-v3";    public static void main(String[] args) throws Exception {        SentenceSpout spout = new SentenceSpout();        SplitSentenceBolt spiltBolt = new SplitSentenceBolt();        WordCountBolt countBolt = new WordCountBolt();        TopologyBuilder builder = new TopologyBuilder();        builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);        builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4)                .shuffleGrouping(SENTENCE_SPOUT_ID);        builder.setBolt(COUNT_BOLT_ID, countBolt, 2)                .directGrouping(SPLIT_BOLT_ID); //使用 Direct Grouping 分组策略        Config config = new Config();        config.put("wordsFile", args[0]);        if (args != null && args.length > 1) {            config.setNumWorkers(2);            //集群模式启动            StormSubmitter.submitTopology(args[1], config, builder.createTopology());        } else {            LocalCluster cluster = new LocalCluster();            cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());            try {                Thread.sleep(5 * 1000);            } catch (InterruptedException e) {            }            cluster.killTopology(TOPOLOGY_NAME);            cluster.shutdown();        }    }}

提交到Storm集群

storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v3.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v3

运行结果:

[grid@hadoop5 stormData]$ cat result.txt second : 1can : 1set : 1simple : 1use : 2unbounded : 1used : 1It : 1Storm : 4online : 1cases: : 1open : 1Apache : 1of : 2over : 1more : 1clocked : 1easy : 2scalable : 1any : 1guarantees : 1ETL : 1million : 1continuous : 1is : 6with : 1it : 2makes : 1your : 1a : 4at : 1machine : 1analytics : 1up : 1and : 5many : 1system : 1source : 1what : 1operate : 1will : 1computation : 2streams : 1[grid@hadoop6 stormData]$ cat result.txt to : 3for : 2data : 2distributed : 2has : 1free : 1programming : 1reliably : 1fast: : 1processing : 2be : 2Hadoop : 1did : 1fun : 1learning : 1torm : 1process : 1RPC : 1node : 1processed : 2per : 2realtime : 3benchmark : 1batch : 1doing : 1lot : 1language : 1tuples : 1fault-tolerant : 1

到此,关于"Storm中怎么使用Direct Grouping分组策略"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0