storm如何配置使用
发表于:2025-02-12 作者:千家信息网编辑
千家信息网最后更新 2025年02月12日,这篇文章主要为大家展示了"storm如何配置使用",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"storm如何配置使用"这篇文章吧。示例代码如下:#stor
千家信息网最后更新 2025年02月12日storm如何配置使用
这篇文章主要为大家展示了"storm如何配置使用",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"storm如何配置使用"这篇文章吧。
示例代码如下:
#storm.yaml 配置#zookeeper storm.zookeeper.servers: - "bigdata01" - "bigdata02" - "bigdata03"#本地存放数据的路径storm.local.dir: "/apps/storm"#nimbus masternimbus.seeds: ["bigdata00"]#workder端口supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 启动命令 bin/ nohup storm nimbus & bin/ nohup storm supervisor & bin/ nohup storm ui &--------------------------------------------------------------------------------------package com.hgs.storm;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.generated.AlreadyAliveException;import org.apache.storm.generated.AuthorizationException;import org.apache.storm.generated.InvalidTopologyException;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.IRichBolt;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;public class StormWordCountTest { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, InterruptedException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("wordspout", new WordCountSpout(), 3); builder.setBolt("splitword", (IRichBolt) new WordSpliteBolt(), 2).shuffleGrouping("wordspout"); //word 是splitword发出的字段,如第九十行 builder.setBolt("wordcount", new WordCountBolt(), 2).fieldsGrouping("splitword", new Fields("word")); Config config = new Config(); config.setNumWorkers(2);/* StormSubmitter.submitTopology("words-count", config, builder.createTopology()); if(args!=null && args.length>0) { StormSubmitter.submitTopology(args[0], config, builder.createTopology()); }else { LocalCluster cluster = new LocalCluster(); }*/ LocalCluster cluster = new LocalCluster(); cluster.submitTopology("words-count", config, builder.createTopology()); }}class WordCountSpout extends BaseRichSpout{ private static final long serialVersionUID = 1L; //从open方法中的到collector,用于declareOutputFields 方法发出字段信息 SpoutOutputCollector collector = null; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { collector.emit(new Values(" this is my first storm program so i hope it will success")); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields("message")); } }class WordSpliteBolt extends BaseRichBolt{ private static final long serialVersionUID = 1L; OutputCollector collector = null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String line = input.getString(0); String[] words = line.split(" "); for(String wd : words) { collector.emit(new Values(wd ,1)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word","num")); } }class WordCountBolt extends BaseRichBolt{ ConcurrentHashMapwordsMap = new ConcurrentHashMap (); private static final long serialVersionUID = 1L; OutputCollector collector = null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String word = input.getString(0); Integer num = input.getInteger(1); if(wordsMap.containsKey(word)) { wordsMap.put(word, wordsMap.get(word)+num); }else { wordsMap.put(word, num); } System.out.println(word +"----"+wordsMap.get(word)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
以上是"storm如何配置使用"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
配置
内容
篇文章
字段
方法
学习
帮助
代码
信息
命令
数据
易懂
更多
条理
知识
示例
端口
编带
行业
资讯
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
泗县啊洁网络技术有限公司
网络安全与维护app
好的网络技术培训条件
服务器型
目前网络安全面临着什么威胁
网络安全竞赛主办方
bscs结构如何整合数据库
手机开mc基岩版服务器
常用的服务器管理命令
软件开发项目部日常管理制度
长沙移动软件开发流程
彭山区地籍数据库项目招标结果
数据通信与网络技术课程教案
dell服务器绿红灯闪
华为服务器怎么把千兆口改成百兆
软件开发公司经营指标
厦门武夷山软件开发
罗克韦尔大学项目部网络技术论坛
广东闲游网络技术有限公司
政府怎么保证网络安全
轻亨互联网科技有限公司
机房服务器信号防护
陕西党员教育软件开发电话
软件开发实习周志标题
五年级的网络安全宣传手抄报
庐阳区综合网络技术咨询优势
甘肃数据库安全审计系统
数据库应用技术实训项目
原神怎么看才知道是哪个服务器
宁波软件开发学