storm启动类怎么定义
发表于:2025-02-23 作者:千家信息网编辑
千家信息网最后更新 2025年02月23日,这篇文章主要介绍"storm启动类怎么定义",在日常操作中,相信很多人在storm启动类怎么定义问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"storm启动类怎么定义"
千家信息网最后更新 2025年02月23日storm启动类怎么定义
这篇文章主要介绍"storm启动类怎么定义",在日常操作中,相信很多人在storm启动类怎么定义问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"storm启动类怎么定义"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
在storm集群中真正运行Topology的主要有三个实例:工作进程丶线程和任务.
Storm集群中的每台机器上都可以运行多个工作进程,每个工作进程又可以创建多个线程,每个线程可以执行多个任务.
Storm可靠性:是通过对消息树给定一个唯一的ID,每送一个消息,都会同步发送一个ack或fail,对于网络的宽带会有一定的消耗,如果对于可靠性要求不高,可以通过使用不同的emit接口关闭该模式.
一、storm启动类的定义。
package com.cmsz.storm.trading.test;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;public class MainStorm { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("A", new ASpout()); builder.setBolt("B", new BBolt()).shuffleGrouping("A", "streamId_B");//componentId和streamId builder.setBolt("C", new CBolt()).shuffleGrouping("A", "streamId_C");//componentId和streamId builder.setBolt("D", new DBolt()).fieldsGrouping("B", new Fields("id"));//componentId和streamId builder.setBolt("E", new EBolt()).fieldsGrouping("C", new Fields("id")); Config conf = new Config(); if (args != null && args.length > 0) { conf.setNumWorkers(1); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("myTopo", conf, builder.createTopology()); } }}
二、spout定义了streamId,接受的bolt要定义componentId与spout中定义的streamId("streamId_B"、"streamId_C")对应定义去接收,fiedsGrouping的new Fields("id")中的id要和componentId的对应bolt中new Fields("id","message")中的"id"对应就会以"id"进行分组
package com.cmsz.storm.trading.test;import java.util.Map;import java.util.Random;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;public class ASpout extends BaseRichSpout{ SpoutOutputCollector collector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { Utils.sleep(10); final String[] words = new String[] {"B_nathan", "C_mike", "B_jackson", "C_golda", "B_bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; if(word.indexOf("B_")>-1){ collector.emit("streamId_B",new Values(word)); }else if(word.indexOf("C_")>-1){ collector.emit("streamId_C",new Values(word)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream("streamId_B", new Fields("streamId_B")); declarer.declareStream("streamId_C", new Fields("streamId_C")); } @Override public void ack(Object msgId) { super.ack(msgId); } @Override public void fail(Object msgId) { super.fail(msgId); }}
package com.cmsz.storm.trading.test;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class BBolt implements IBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id","message")); } @Override public MapgetComponentConfiguration() { return null; } @Override public void prepare(Map stormConf, TopologyContext context) { } @Override public void execute(Tuple input, BasicOutputCollector collector) { String msg = input.getString(0); System.out.println(msg); collector.emit(new Values(msg,msg+"BBolt")); } @Override public void cleanup() { }}
package com.cmsz.storm.trading.test;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class CBolt implements IBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id","message")); } @Override public MapgetComponentConfiguration() { return null; } @Override public void prepare(Map stormConf, TopologyContext context) { } @Override public void execute(Tuple input, BasicOutputCollector collector) { String msg = input.getString(0); System.out.println(msg); collector.emit(new Values(msg,msg+"CBolt")); } @Override public void cleanup() { }}
package com.cmsz.storm.trading.test;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;public class DBolt implements IBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("message")); } @Override public MapgetComponentConfiguration() { return null; } @Override public void prepare(Map stormConf, TopologyContext context) { } @Override public void execute(Tuple input, BasicOutputCollector collector) { System.out.println("DBolt"+input.getString(0)); } @Override public void cleanup() { }}
package com.cmsz.storm.trading.test;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;public class EBolt implements IBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("message")); } @Override public MapgetComponentConfiguration() { return null; } @Override public void prepare(Map stormConf, TopologyContext context) { } @Override public void execute(Tuple input, BasicOutputCollector collector) { System.out.println("EBolt"+input.getString(0)); } @Override public void cleanup() { }}
到此,关于"storm启动类怎么定义"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!
学习
多个
线程
进程
工作
任务
可靠性
更多
消息
集群
帮助
运行
不同
实用
接下来
三个
可以通过
实例
接口
文章
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
榆树正规网络技术咨询哪家好
医疗软件开发价格
uuid存在数据库是多少位
网络安全手抄报没有颜色
请求php网站数据库
gdp数据库在哪可查
keaz 软件开发包
网络安全企业的愿景
eplan管理数据库
成都交友软件开发定制费用
网络安全最新例子
徐州老司机公司网络技术有限公司
服务器管理工具有什么提供
贵州销售软件开发费用
搭建本地音乐服务器
软件开发技巧和方法
涉密网络安全保密管理
自己做软件开发需要哪些基础
数据库sql use添加表
网络安全工程师资格
研究生 计算机网络技术
数据库中存的内容可以换行吗
手机软件开发青岗科技
电厂网络安全文件
昆山培训软件开发
建立数据库传承人
应用交付网络技术
上海哎媲媲网络技术咋样
导入数据库英文变中文版
我的世界服务器阿阳