Storm中怎么使用Direct Grouping分组策略
发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,这篇文章主要介绍"Storm中怎么使用Direct Grouping分组策略",在日常操作中,相信很多人在Storm中怎么使用Direct Grouping分组策略问题上存在疑惑,小编查阅了各式资料,
千家信息网最后更新 2025年02月03日Storm中怎么使用Direct Grouping分组策略
这篇文章主要介绍"Storm中怎么使用Direct Grouping分组策略",在日常操作中,相信很多人在Storm中怎么使用Direct Grouping分组策略问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Storm中怎么使用Direct Grouping分组策略"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
使用 Direct Grouping 分组策略,将首字母相同的单词发送给同一个task计数
数据源spout
package com.zhch.v3;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import java.io.BufferedReader;import java.io.FileReader;import java.util.Map;import java.util.UUID;import java.util.concurrent.ConcurrentHashMap;public class SentenceSpout extends BaseRichSpout { private FileReader fileReader = null; private boolean completed = false; private ConcurrentHashMappending; private SpoutOutputCollector collector; @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("sentence")); } @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector = spoutOutputCollector; this.pending = new ConcurrentHashMap (); try { this.fileReader = new FileReader(map.get("wordsFile").toString()); } catch (Exception e) { throw new RuntimeException("Error reading file [" + map.get("wordsFile") + "]"); } } @Override public void nextTuple() { if (completed) { try { Thread.sleep(1000); } catch (InterruptedException e) { } } String line; BufferedReader reader = new BufferedReader(fileReader); try { while ((line = reader.readLine()) != null) { Values values = new Values(line); UUID msgId = UUID.randomUUID(); this.pending.put(msgId, values); this.collector.emit(values, msgId); } } catch (Exception e) { throw new RuntimeException("Error reading tuple", e); } finally { completed = true; } } @Override public void ack(Object msgId) { this.pending.remove(msgId); } @Override public void fail(Object msgId) { this.collector.emit(this.pending.get(msgId), msgId); }}
实现语句分割bolt
package com.zhch.v3;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.List;import java.util.Map;public class SplitSentenceBolt extends BaseRichBolt { private OutputCollector collector; private ListnumCounterTasks; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; //获取下游bolt的taskId列表 this.numCounterTasks = topologyContext.getComponentTasks(WordCountTopology.COUNT_BOLT_ID); } @Override public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split(" "); for (String word : words) { Integer taskId = this.numCounterTasks.get(this.getWordCountIndex(word)); collector.emitDirect(taskId, tuple, new Values(word)); } this.collector.ack(tuple); } public Integer getWordCountIndex(String word) { word = word.trim().toUpperCase(); if (word.isEmpty()) return 0; else { //单词首字母对下游 bolt taskId 列表长度取余 return word.charAt(0) % numCounterTasks.size(); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); }}
实现单词计数bolt
package com.zhch.v3;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import java.io.BufferedWriter;import java.io.FileWriter;import java.util.HashMap;import java.util.Iterator;import java.util.Map;public class WordCountBolt extends BaseRichBolt { private OutputCollector collector; private HashMapcounts = null; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; this.counts = new HashMap (); } @Override public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); Long count = this.counts.get(word); if (count == null) { count = 0L; } count++; this.counts.put(word, count); BufferedWriter writer = null; try { writer = new BufferedWriter(new FileWriter("/home/grid/stormData/result.txt")); Iterator keys = this.counts.keySet().iterator(); while (keys.hasNext()) { String w = keys.next(); Long c = this.counts.get(w); writer.write(w + " : " + c); writer.newLine(); writer.flush(); } } catch (Exception e) { e.printStackTrace(); } finally { if (writer != null) { try { writer.close(); } catch (Exception e) { e.printStackTrace(); } writer = null; } } this.collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word", "count")); }}
实现单词计数topology
package com.zhch.v3;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.topology.TopologyBuilder;public class WordCountTopology { public static final String SENTENCE_SPOUT_ID = "sentence-spout"; public static final String SPLIT_BOLT_ID = "split-bolt"; public static final String COUNT_BOLT_ID = "count-bolt"; public static final String TOPOLOGY_NAME = "word-count-topology-v3"; public static void main(String[] args) throws Exception { SentenceSpout spout = new SentenceSpout(); SplitSentenceBolt spiltBolt = new SplitSentenceBolt(); WordCountBolt countBolt = new WordCountBolt(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SENTENCE_SPOUT_ID, spout, 2); builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4) .shuffleGrouping(SENTENCE_SPOUT_ID); builder.setBolt(COUNT_BOLT_ID, countBolt, 2) .directGrouping(SPLIT_BOLT_ID); //使用 Direct Grouping 分组策略 Config config = new Config(); config.put("wordsFile", args[0]); if (args != null && args.length > 1) { config.setNumWorkers(2); //集群模式启动 StormSubmitter.submitTopology(args[1], config, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); try { Thread.sleep(5 * 1000); } catch (InterruptedException e) { } cluster.killTopology(TOPOLOGY_NAME); cluster.shutdown(); } }}
提交到Storm集群
storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v3.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v3
运行结果:
[grid@hadoop5 stormData]$ cat result.txt second : 1can : 1set : 1simple : 1use : 2unbounded : 1used : 1It : 1Storm : 4online : 1cases: : 1open : 1Apache : 1of : 2over : 1more : 1clocked : 1easy : 2scalable : 1any : 1guarantees : 1ETL : 1million : 1continuous : 1is : 6with : 1it : 2makes : 1your : 1a : 4at : 1machine : 1analytics : 1up : 1and : 5many : 1system : 1source : 1what : 1operate : 1will : 1computation : 2streams : 1[grid@hadoop6 stormData]$ cat result.txt to : 3for : 2data : 2distributed : 2has : 1free : 1programming : 1reliably : 1fast: : 1processing : 2be : 2Hadoop : 1did : 1fun : 1learning : 1torm : 1process : 1RPC : 1node : 1processed : 2per : 2realtime : 3benchmark : 1batch : 1doing : 1lot : 1language : 1tuples : 1fault-tolerant : 1
到此,关于"Storm中怎么使用Direct Grouping分组策略"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!
策略
分组
单词
学习
字母
更多
集群
帮助
实用
相同
接下来
数据
数据源
文章
方法
模式
理论
知识
篇文章
结果
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
三级网络技术可以刷题通过
网络安全第3版答案
化学中运用到服务器的领域
网络大厂有多少服务器
软件开发流动资金估算表
软件开发学生适合的电脑
网络安全守护图解
学软件开发一般多少钱
普陀区互联网教育科技市场
无盘服务器虚占内存
沈逸网络安全知乎
台州有软件开发吗
LUM数据库技术路线图
丰台区信息网络技术服务口碑推荐
原神都分什么服务器
we7 数据库
招商银行网络安全
暗黑破坏神最强服务器
国内服务器优化
数据库备份exp
武汉巨杉数据库薪资
编程软件开发最顶级的笔记本电脑
如何让数据库开机不启动呢
怎么查找网页游戏的服务器
中小学网络安全现状
核电软件开发工程师
和林网络安全检查服务
数据库不安全因素有哪些
锐起无盘服务器
人名日报图文数据库