千家信息网

Storm怎么实现单词计数

发表于:2025-01-22 作者:千家信息网编辑
千家信息网最后更新 2025年01月22日,本篇内容主要讲解"Storm怎么实现单词计数",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Storm怎么实现单词计数"吧!在上一次单词计数的基础上做如下改
千家信息网最后更新 2025年01月22日Storm怎么实现单词计数

本篇内容主要讲解"Storm怎么实现单词计数",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Storm怎么实现单词计数"吧!

在上一次单词计数的基础上做如下改动: 使用 自定义 分组策略,将首字母相同的单词发送给同一个task计数
自定义 CustomStreamGrouping

package com.zhch.v4;import backtype.storm.generated.GlobalStreamId;import backtype.storm.grouping.CustomStreamGrouping;import backtype.storm.task.WorkerTopologyContext;import java.io.Serializable;import java.util.ArrayList;import java.util.List;public class ModuleGrouping implements CustomStreamGrouping, Serializable {    private List tasks;    @Override    public void prepare(WorkerTopologyContext workerContext, GlobalStreamId streamId, List targetTasks) {        this.tasks = targetTasks;    }    @Override    public List chooseTasks(int taskId, List values) {        List taskIds = new ArrayList();        if (values.size() > 0) {            String str = values.get(0).toString();            if (str.isEmpty()) {                taskIds.add(0);            } else {                Integer index = str.charAt(0) % tasks.size();                taskIds.add(tasks.get(index));            }        }        return taskIds;    }}

数据源spout

package com.zhch.v4;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import java.io.BufferedReader;import java.io.FileReader;import java.util.Map;import java.util.UUID;import java.util.concurrent.ConcurrentHashMap;public class SentenceSpout extends BaseRichSpout {    private FileReader fileReader = null;    private boolean completed = false;    private ConcurrentHashMap pending;    private SpoutOutputCollector collector;    @Override    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {        outputFieldsDeclarer.declare(new Fields("sentence"));    }    @Override    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {        this.collector = spoutOutputCollector;        this.pending = new ConcurrentHashMap();        try {            this.fileReader = new FileReader(map.get("wordsFile").toString());        } catch (Exception e) {            throw new RuntimeException("Error reading file [" + map.get("wordsFile") + "]");        }    }    @Override    public void nextTuple() {        if (completed) {            try {                Thread.sleep(1000);            } catch (InterruptedException e) {            }        }        String line;        BufferedReader reader = new BufferedReader(fileReader);        try {            while ((line = reader.readLine()) != null) {                Values values = new Values(line);                UUID msgId = UUID.randomUUID();                this.pending.put(msgId, values);                this.collector.emit(values, msgId);            }        } catch (Exception e) {            throw new RuntimeException("Error reading tuple", e);        } finally {            completed = true;        }    }    @Override    public void ack(Object msgId) {        this.pending.remove(msgId);    }    @Override    public void fail(Object msgId) {        this.collector.emit(this.pending.get(msgId), msgId);    }}

实现语句分割bolt

package com.zhch.v4;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.Map;public class SplitSentenceBolt extends BaseRichBolt {    private OutputCollector collector;    @Override    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {        this.collector = outputCollector;    }    @Override    public void execute(Tuple tuple) {        String sentence = tuple.getStringByField("sentence");        String[] words = sentence.split(" ");        for (String word : words) {            collector.emit(tuple, new Values(word));        }        this.collector.ack(tuple);    }    @Override    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {        outputFieldsDeclarer.declare(new Fields("word"));    }}

实现单词计数bolt

package com.zhch.v4;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import java.io.BufferedWriter;import java.io.FileWriter;import java.util.ArrayList;import java.util.Collections;import java.util.HashMap;import java.util.List;import java.util.Map;public class WordCountBolt extends BaseRichBolt {    private OutputCollector collector;    private HashMap counts = null;    @Override    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {        this.collector = outputCollector;        this.counts = new HashMap();    }    @Override    public void execute(Tuple tuple) {        String word = tuple.getStringByField("word");        Long count = this.counts.get(word);        if (count == null) {            count = 0L;        }        count++;        this.counts.put(word, count);        BufferedWriter writer = null;        try {            writer = new BufferedWriter(new FileWriter("/home/grid/stormData/result.txt"));            List keys = new ArrayList();            keys.addAll(this.counts.keySet());            Collections.sort(keys);            for (String key : keys) {                Long c = this.counts.get(key);                writer.write(key + " : " + c);                writer.newLine();                writer.flush();            }        } catch (Exception e) {            e.printStackTrace();        } finally {            if (writer != null) {                try {                    writer.close();                } catch (Exception e) {                    e.printStackTrace();                }                writer = null;            }        }        this.collector.ack(tuple);    }    @Override    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {        outputFieldsDeclarer.declare(new Fields("word", "count"));    }}

实现单词计数topology

package com.zhch.v4;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.topology.TopologyBuilder;public class WordCountTopology {    public static final String SENTENCE_SPOUT_ID = "sentence-spout";    public static final String SPLIT_BOLT_ID = "split-bolt";    public static final String COUNT_BOLT_ID = "count-bolt";    public static final String TOPOLOGY_NAME = "word-count-topology-v4";    public static void main(String[] args) throws Exception {        SentenceSpout spout = new SentenceSpout();        SplitSentenceBolt spiltBolt = new SplitSentenceBolt();        WordCountBolt countBolt = new WordCountBolt();        TopologyBuilder builder = new TopologyBuilder();        builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);        builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4)                .shuffleGrouping(SENTENCE_SPOUT_ID);        builder.setBolt(COUNT_BOLT_ID, countBolt, 2)                .customGrouping(SPLIT_BOLT_ID, new ModuleGrouping()); //使用 自定义 分组策略        Config config = new Config();        config.put("wordsFile", args[0]);        if (args != null && args.length > 1) {            config.setNumWorkers(2);            //集群模式启动            StormSubmitter.submitTopology(args[1], config, builder.createTopology());        } else {            LocalCluster cluster = new LocalCluster();            cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());            try {                Thread.sleep(5 * 1000);            } catch (InterruptedException e) {            }            cluster.killTopology(TOPOLOGY_NAME);            cluster.shutdown();        }    }}

提交到Storm集群

storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v4.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v4

运行结果:

[grid@hadoop5 stormData]$ cat result.txt Apache : 1ETL : 1It : 1Storm : 4a : 4analytics : 1and : 5any : 1at : 1can : 1cases: : 1clocked : 1computation : 2continuous : 1easy : 2guarantees : 1is : 6it : 2machine : 1makes : 1many : 1million : 1more : 1of : 2online : 1open : 1operate : 1over : 1scalable : 1second : 1set : 1simple : 1source : 1streams : 1system : 1unbounded : 1up : 1use : 2used : 1what : 1will : 1with : 1your : 1[grid@hadoop6 stormData]$ cat result.txt Hadoop : 1RPC : 1batch : 1be : 2benchmark : 1data : 2did : 1distributed : 2doing : 1fast: : 1fault-tolerant : 1for : 2free : 1fun : 1has : 1language : 1learning : 1lot : 1node : 1per : 2process : 1processed : 2processing : 2programming : 1realtime : 3reliably : 1to : 3torm : 1tuples : 1

到此,相信大家对"Storm怎么实现单词计数"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0