Storm怎么实现单词计数
发表于:2025-02-23 作者:千家信息网编辑
千家信息网最后更新 2025年02月23日,本篇内容主要讲解"Storm怎么实现单词计数",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Storm怎么实现单词计数"吧!在上一次单词计数的基础上做如下改
千家信息网最后更新 2025年02月23日Storm怎么实现单词计数
本篇内容主要讲解"Storm怎么实现单词计数",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Storm怎么实现单词计数"吧!
在上一次单词计数的基础上做如下改动: 使用 自定义 分组策略,将首字母相同的单词发送给同一个task计数
自定义 CustomStreamGrouping
package com.zhch.v4;import backtype.storm.generated.GlobalStreamId;import backtype.storm.grouping.CustomStreamGrouping;import backtype.storm.task.WorkerTopologyContext;import java.io.Serializable;import java.util.ArrayList;import java.util.List;public class ModuleGrouping implements CustomStreamGrouping, Serializable { private Listtasks; @Override public void prepare(WorkerTopologyContext workerContext, GlobalStreamId streamId, List targetTasks) { this.tasks = targetTasks; } @Override public List chooseTasks(int taskId, List
数据源spout
package com.zhch.v4;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import java.io.BufferedReader;import java.io.FileReader;import java.util.Map;import java.util.UUID;import java.util.concurrent.ConcurrentHashMap;public class SentenceSpout extends BaseRichSpout { private FileReader fileReader = null; private boolean completed = false; private ConcurrentHashMappending; private SpoutOutputCollector collector; @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("sentence")); } @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector = spoutOutputCollector; this.pending = new ConcurrentHashMap (); try { this.fileReader = new FileReader(map.get("wordsFile").toString()); } catch (Exception e) { throw new RuntimeException("Error reading file [" + map.get("wordsFile") + "]"); } } @Override public void nextTuple() { if (completed) { try { Thread.sleep(1000); } catch (InterruptedException e) { } } String line; BufferedReader reader = new BufferedReader(fileReader); try { while ((line = reader.readLine()) != null) { Values values = new Values(line); UUID msgId = UUID.randomUUID(); this.pending.put(msgId, values); this.collector.emit(values, msgId); } } catch (Exception e) { throw new RuntimeException("Error reading tuple", e); } finally { completed = true; } } @Override public void ack(Object msgId) { this.pending.remove(msgId); } @Override public void fail(Object msgId) { this.collector.emit(this.pending.get(msgId), msgId); }}
实现语句分割bolt
package com.zhch.v4;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.Map;public class SplitSentenceBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } @Override public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split(" "); for (String word : words) { collector.emit(tuple, new Values(word)); } this.collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); }}
实现单词计数bolt
package com.zhch.v4;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import java.io.BufferedWriter;import java.io.FileWriter;import java.util.ArrayList;import java.util.Collections;import java.util.HashMap;import java.util.List;import java.util.Map;public class WordCountBolt extends BaseRichBolt { private OutputCollector collector; private HashMapcounts = null; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; this.counts = new HashMap (); } @Override public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); Long count = this.counts.get(word); if (count == null) { count = 0L; } count++; this.counts.put(word, count); BufferedWriter writer = null; try { writer = new BufferedWriter(new FileWriter("/home/grid/stormData/result.txt")); List keys = new ArrayList (); keys.addAll(this.counts.keySet()); Collections.sort(keys); for (String key : keys) { Long c = this.counts.get(key); writer.write(key + " : " + c); writer.newLine(); writer.flush(); } } catch (Exception e) { e.printStackTrace(); } finally { if (writer != null) { try { writer.close(); } catch (Exception e) { e.printStackTrace(); } writer = null; } } this.collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word", "count")); }}
实现单词计数topology
package com.zhch.v4;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.topology.TopologyBuilder;public class WordCountTopology { public static final String SENTENCE_SPOUT_ID = "sentence-spout"; public static final String SPLIT_BOLT_ID = "split-bolt"; public static final String COUNT_BOLT_ID = "count-bolt"; public static final String TOPOLOGY_NAME = "word-count-topology-v4"; public static void main(String[] args) throws Exception { SentenceSpout spout = new SentenceSpout(); SplitSentenceBolt spiltBolt = new SplitSentenceBolt(); WordCountBolt countBolt = new WordCountBolt(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SENTENCE_SPOUT_ID, spout, 2); builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4) .shuffleGrouping(SENTENCE_SPOUT_ID); builder.setBolt(COUNT_BOLT_ID, countBolt, 2) .customGrouping(SPLIT_BOLT_ID, new ModuleGrouping()); //使用 自定义 分组策略 Config config = new Config(); config.put("wordsFile", args[0]); if (args != null && args.length > 1) { config.setNumWorkers(2); //集群模式启动 StormSubmitter.submitTopology(args[1], config, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); try { Thread.sleep(5 * 1000); } catch (InterruptedException e) { } cluster.killTopology(TOPOLOGY_NAME); cluster.shutdown(); } }}
提交到Storm集群
storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v4.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v4
运行结果:
[grid@hadoop5 stormData]$ cat result.txt Apache : 1ETL : 1It : 1Storm : 4a : 4analytics : 1and : 5any : 1at : 1can : 1cases: : 1clocked : 1computation : 2continuous : 1easy : 2guarantees : 1is : 6it : 2machine : 1makes : 1many : 1million : 1more : 1of : 2online : 1open : 1operate : 1over : 1scalable : 1second : 1set : 1simple : 1source : 1streams : 1system : 1unbounded : 1up : 1use : 2used : 1what : 1will : 1with : 1your : 1[grid@hadoop6 stormData]$ cat result.txt Hadoop : 1RPC : 1batch : 1be : 2benchmark : 1data : 2did : 1distributed : 2doing : 1fast: : 1fault-tolerant : 1for : 2free : 1fun : 1has : 1language : 1learning : 1lot : 1node : 1per : 2process : 1processed : 2processing : 2programming : 1realtime : 3reliably : 1to : 3torm : 1tuples : 1
到此,相信大家对"Storm怎么实现单词计数"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
单词
内容
策略
集群
分组
学习
实用
更深
相同
兴趣
基础
字母
实用性
实际
操作简单
数据
数据源
方法
更多
朋友
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
服务器释放
华为企业云服务器配置
管理软件开发及作用
安装ug显示服务器无效
网络安全宣传手报
新浪cba数据库
3级数据库习题
按行业划分网络安全事件占比统计
软件开发技术提升阶段
battlebit无法连接主服务器
oracledrop数据库
mc服务器地址1.16.5
河北省软件开发基准人月费率
新 卡 无服务器
网校服务器托管
网警网络安全监察知识试题
金山区个性化软件开发诚信合作
计算机网络技术专业出来工作
浙江企业软件开发商家
华硕笔记本音频服务器未响应
网络安全教育图片大全简单
数据库系统工程师题库软题库
sql数据库无法克隆硬盘
关于部队网络安全的新闻
宇视服务器串口登陆设备
趣融网络技术有限公司
华为企业云服务器配置
td数据库合并函数
网络安全反诈防骗手抄报内容
云尊服务器