千家信息网

storm如何配置使用

发表于:2025-02-12 作者:千家信息网编辑
千家信息网最后更新 2025年02月12日,这篇文章主要为大家展示了"storm如何配置使用",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"storm如何配置使用"这篇文章吧。示例代码如下:#stor
千家信息网最后更新 2025年02月12日storm如何配置使用

这篇文章主要为大家展示了"storm如何配置使用",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"storm如何配置使用"这篇文章吧。

示例代码如下:

#storm.yaml   配置#zookeeper storm.zookeeper.servers:    - "bigdata01"    - "bigdata02"    - "bigdata03"#本地存放数据的路径storm.local.dir: "/apps/storm"#nimbus masternimbus.seeds: ["bigdata00"]#workder端口supervisor.slots.ports:    - 6700    - 6701    - 6702    - 6703                启动命令        bin/ nohup storm nimbus &                bin/ nohup storm supervisor &        bin/ nohup storm ui &--------------------------------------------------------------------------------------package com.hgs.storm;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.generated.AlreadyAliveException;import org.apache.storm.generated.AuthorizationException;import org.apache.storm.generated.InvalidTopologyException;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.IRichBolt;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;public class StormWordCountTest {        public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, InterruptedException {                TopologyBuilder builder = new TopologyBuilder();                builder.setSpout("wordspout", new WordCountSpout(), 3);                builder.setBolt("splitword", (IRichBolt) new WordSpliteBolt(), 2).shuffleGrouping("wordspout");                //word 是splitword发出的字段,如第九十行                builder.setBolt("wordcount", new WordCountBolt(), 2).fieldsGrouping("splitword", new Fields("word"));                Config config = new Config();                config.setNumWorkers(2);/*              StormSubmitter.submitTopology("words-count", config, builder.createTopology());                        if(args!=null && args.length>0) {                        StormSubmitter.submitTopology(args[0], config, builder.createTopology());                }else {                        LocalCluster cluster = new LocalCluster();                                        }*/                LocalCluster cluster = new LocalCluster();                cluster.submitTopology("words-count", config, builder.createTopology());                        }}class WordCountSpout extends BaseRichSpout{        private static final long serialVersionUID = 1L;        //从open方法中的到collector,用于declareOutputFields 方法发出字段信息        SpoutOutputCollector collector = null;        @Override        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {                this.collector = collector;        }        @Override        public void nextTuple() {                                collector.emit(new Values(" this is my first storm program so i hope it will success"));        }        @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {                declarer.declare( new Fields("message"));        }        }class WordSpliteBolt extends BaseRichBolt{        private static final long serialVersionUID = 1L;        OutputCollector collector = null;        @Override        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {                this.collector = collector;        }        @Override        public void execute(Tuple input) {                String line  = input.getString(0);                String[] words  = line.split(" ");                for(String wd : words) {                        collector.emit(new Values(wd ,1));                }                }        @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {                declarer.declare(new Fields("word","num"));        }        }class WordCountBolt extends BaseRichBolt{        ConcurrentHashMap wordsMap = new ConcurrentHashMap();        private static final long serialVersionUID = 1L;        OutputCollector collector = null;        @Override        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {                this.collector = collector;        }        @Override        public void execute(Tuple input) {                String word = input.getString(0);                Integer num = input.getInteger(1);                if(wordsMap.containsKey(word)) {                        wordsMap.put(word, wordsMap.get(word)+num);                }else {                        wordsMap.put(word, num);                }                System.out.println(word +"----"+wordsMap.get(word));        }        @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {                        }                }

以上是"storm如何配置使用"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

配置 内容 篇文章 字段 方法 学习 帮助 代码 信息 命令 数据 易懂 更多 条理 知识 示例 端口 编带 行业 资讯 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 泗县啊洁网络技术有限公司 网络安全与维护app 好的网络技术培训条件 服务器型 目前网络安全面临着什么威胁 网络安全竞赛主办方 bscs结构如何整合数据库 手机开mc基岩版服务器 常用的服务器管理命令 软件开发项目部日常管理制度 长沙移动软件开发流程 彭山区地籍数据库项目招标结果 数据通信与网络技术课程教案 dell服务器绿红灯闪 华为服务器怎么把千兆口改成百兆 软件开发公司经营指标 厦门武夷山软件开发 罗克韦尔大学项目部网络技术论坛 广东闲游网络技术有限公司 政府怎么保证网络安全 轻亨互联网科技有限公司 机房服务器信号防护 陕西党员教育软件开发电话 软件开发实习周志标题 五年级的网络安全宣传手抄报 庐阳区综合网络技术咨询优势 甘肃数据库安全审计系统 数据库应用技术实训项目 原神怎么看才知道是哪个服务器 宁波软件开发学
0