Storm怎么实现单词计数
发表于:2025-01-22 作者:千家信息网编辑
千家信息网最后更新 2025年01月22日,本篇内容主要讲解"Storm怎么实现单词计数",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Storm怎么实现单词计数"吧!在上一次单词计数的基础上做如下改
千家信息网最后更新 2025年01月22日Storm怎么实现单词计数
本篇内容主要讲解"Storm怎么实现单词计数",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Storm怎么实现单词计数"吧!
在上一次单词计数的基础上做如下改动: 使用 自定义 分组策略,将首字母相同的单词发送给同一个task计数
自定义 CustomStreamGrouping
package com.zhch.v4;import backtype.storm.generated.GlobalStreamId;import backtype.storm.grouping.CustomStreamGrouping;import backtype.storm.task.WorkerTopologyContext;import java.io.Serializable;import java.util.ArrayList;import java.util.List;public class ModuleGrouping implements CustomStreamGrouping, Serializable { private Listtasks; @Override public void prepare(WorkerTopologyContext workerContext, GlobalStreamId streamId, List targetTasks) { this.tasks = targetTasks; } @Override public List chooseTasks(int taskId, List
数据源spout
package com.zhch.v4;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import java.io.BufferedReader;import java.io.FileReader;import java.util.Map;import java.util.UUID;import java.util.concurrent.ConcurrentHashMap;public class SentenceSpout extends BaseRichSpout { private FileReader fileReader = null; private boolean completed = false; private ConcurrentHashMappending; private SpoutOutputCollector collector; @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("sentence")); } @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector = spoutOutputCollector; this.pending = new ConcurrentHashMap (); try { this.fileReader = new FileReader(map.get("wordsFile").toString()); } catch (Exception e) { throw new RuntimeException("Error reading file [" + map.get("wordsFile") + "]"); } } @Override public void nextTuple() { if (completed) { try { Thread.sleep(1000); } catch (InterruptedException e) { } } String line; BufferedReader reader = new BufferedReader(fileReader); try { while ((line = reader.readLine()) != null) { Values values = new Values(line); UUID msgId = UUID.randomUUID(); this.pending.put(msgId, values); this.collector.emit(values, msgId); } } catch (Exception e) { throw new RuntimeException("Error reading tuple", e); } finally { completed = true; } } @Override public void ack(Object msgId) { this.pending.remove(msgId); } @Override public void fail(Object msgId) { this.collector.emit(this.pending.get(msgId), msgId); }}
实现语句分割bolt
package com.zhch.v4;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.Map;public class SplitSentenceBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; } @Override public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split(" "); for (String word : words) { collector.emit(tuple, new Values(word)); } this.collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); }}
实现单词计数bolt
package com.zhch.v4;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import java.io.BufferedWriter;import java.io.FileWriter;import java.util.ArrayList;import java.util.Collections;import java.util.HashMap;import java.util.List;import java.util.Map;public class WordCountBolt extends BaseRichBolt { private OutputCollector collector; private HashMapcounts = null; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; this.counts = new HashMap (); } @Override public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); Long count = this.counts.get(word); if (count == null) { count = 0L; } count++; this.counts.put(word, count); BufferedWriter writer = null; try { writer = new BufferedWriter(new FileWriter("/home/grid/stormData/result.txt")); List keys = new ArrayList (); keys.addAll(this.counts.keySet()); Collections.sort(keys); for (String key : keys) { Long c = this.counts.get(key); writer.write(key + " : " + c); writer.newLine(); writer.flush(); } } catch (Exception e) { e.printStackTrace(); } finally { if (writer != null) { try { writer.close(); } catch (Exception e) { e.printStackTrace(); } writer = null; } } this.collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word", "count")); }}
实现单词计数topology
package com.zhch.v4;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.topology.TopologyBuilder;public class WordCountTopology { public static final String SENTENCE_SPOUT_ID = "sentence-spout"; public static final String SPLIT_BOLT_ID = "split-bolt"; public static final String COUNT_BOLT_ID = "count-bolt"; public static final String TOPOLOGY_NAME = "word-count-topology-v4"; public static void main(String[] args) throws Exception { SentenceSpout spout = new SentenceSpout(); SplitSentenceBolt spiltBolt = new SplitSentenceBolt(); WordCountBolt countBolt = new WordCountBolt(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SENTENCE_SPOUT_ID, spout, 2); builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4) .shuffleGrouping(SENTENCE_SPOUT_ID); builder.setBolt(COUNT_BOLT_ID, countBolt, 2) .customGrouping(SPLIT_BOLT_ID, new ModuleGrouping()); //使用 自定义 分组策略 Config config = new Config(); config.put("wordsFile", args[0]); if (args != null && args.length > 1) { config.setNumWorkers(2); //集群模式启动 StormSubmitter.submitTopology(args[1], config, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); try { Thread.sleep(5 * 1000); } catch (InterruptedException e) { } cluster.killTopology(TOPOLOGY_NAME); cluster.shutdown(); } }}
提交到Storm集群
storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v4.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v4
运行结果:
[grid@hadoop5 stormData]$ cat result.txt Apache : 1ETL : 1It : 1Storm : 4a : 4analytics : 1and : 5any : 1at : 1can : 1cases: : 1clocked : 1computation : 2continuous : 1easy : 2guarantees : 1is : 6it : 2machine : 1makes : 1many : 1million : 1more : 1of : 2online : 1open : 1operate : 1over : 1scalable : 1second : 1set : 1simple : 1source : 1streams : 1system : 1unbounded : 1up : 1use : 2used : 1what : 1will : 1with : 1your : 1[grid@hadoop6 stormData]$ cat result.txt Hadoop : 1RPC : 1batch : 1be : 2benchmark : 1data : 2did : 1distributed : 2doing : 1fast: : 1fault-tolerant : 1for : 2free : 1fun : 1has : 1language : 1learning : 1lot : 1node : 1per : 2process : 1processed : 2processing : 2programming : 1realtime : 3reliably : 1to : 3torm : 1tuples : 1
到此,相信大家对"Storm怎么实现单词计数"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
单词
内容
策略
集群
分组
学习
实用
更深
相同
兴趣
基础
字母
实用性
实际
操作简单
数据
数据源
方法
更多
朋友
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
中国软件开发公司机会
软件开发企业印花税怎么申报
大唐无双手游转服务器
软件开发管理制度6
网络安全手抄报防溺水黑板报
网格化服务器管理信息平台
数据库视图在哪创建
以会议论文资源为主的数据库
全球网络安全委员会职责
数据库怎么会锁住
数据库管理师基础课程
后台管理软件部署到服务器上
指纹sql数据库验证
关于通知收集网络安全教育
es数据库时间排序写法
盘龙小说软件开发
网络安全大片
我们社会的网络安全吗
软件开发收入有哪些
学软件开发需要多少钱
vb数据库的访问技术包括什么
网易免费企业邮箱服务器
国家网络安全意识培养方案
备份数据库指定转义字符
网络安全类技术论文
服务器哪些股票值得投资
网络安全技术人才需求的现况
python模块微软数据库
科技互联网类网站有哪些
黄埔dns服务器