storm的本地模式demo怎么实现
发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,本篇内容介绍了"storm的本地模式demo怎么实现"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!S
千家信息网最后更新 2025年02月05日storm的本地模式demo怎么实现
本篇内容介绍了"storm的本地模式demo怎么实现"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
SimpleTopology.java
package com.zgl.helloword;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.topology.TopologyBuilder;/** * 定义了一个简单的topology,包括一个数据喷发节点spout和一个数据处理节点bolt。 * * @author Administrator * */public class SimpleTopology { public static void main(String[] args) { try { // 实例化TopologyBuilder类。 TopologyBuilder topologyBuilder = new TopologyBuilder(); // 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。 topologyBuilder.setSpout("SimpleSpout", new SimpleSpout(), 1); // 设置数据处理节点并分配并发数。指定该节点接收喷发节点的策略为随机方式。 topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 3).shuffleGrouping("SimpleSpout"); Config config = new Config(); config.setDebug(false); if (args != null && args.length > 0) { config.setNumWorkers(1); StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology()); } else { // 这里是本地模式下运行的启动代码。 config.setMaxTaskParallelism(1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("simple", config, topologyBuilder.createTopology()); } } catch (Exception e) { e.printStackTrace(); } }}
SimpleSpout.java
package com.zgl.helloword;import java.util.Map;import java.util.Random;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;/** * Spout起到和外界沟通的作用,他可以从一个数据库中按照某种规则取数据,也可以从分布式队列中取任务 * * @author Administrator * */public class SimpleSpout extends BaseRichSpout{ /** * */ private static final long serialVersionUID = 1L; //用来发射数据的工具类 private SpoutOutputCollector collector; private static String[] info = new String[]{ "comaple\t,12424,44w46,654,12424,44w46,654,", "lisi\t,435435,6537,12424,44w46,654,", "lipeng\t,45735,6757,12424,44w46,654,", "hujintao\t,45735,6757,12424,44w46,654,", "jiangmin\t,23545,6457,2455,7576,qr44453", "beijing\t,435435,6537,12424,44w46,654,", "xiaoming\t,46654,8579,w3675,85877,077998,", "xiaozhang\t,9789,788,97978,656,345235,09889,", "ceo\t,46654,8579,w3675,85877,077998,", "cto\t,46654,8579,w3675,85877,077998,", "zhansan\t,46654,8579,w3675,85877,077998,"}; Random random=new Random(); /** * 初始化collector */ @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } /** * 在SpoutTracker类中被调用,每调用一次就可以向storm集群中发射一条数据(一个tuple元组),该方法会被不停的调用 */ public void nextTuple() { try { String msg = info[random.nextInt(11)]; // 调用发射方法 collector.emit(new Values(msg)); // 模拟等待100ms Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。 * 该declarer变量有很大作用,我们还可以调用declarer.declareStream();来定义stramId,该id可以用来定义更加复杂的流拓扑结构 */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("source")); //collector.emit(new Values(msg));参数要对应 }}
SimpleBolt.java
package com.zgl.helloword;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;/** * 接收喷发节点(Spout)发送的数据进行简单的处理后,发射出去。 * * @author Administrator * */@SuppressWarnings("serial")public class SimpleBolt extends BaseBasicBolt { public void execute(Tuple input, BasicOutputCollector collector) { try { String msg = input.getString(0); if (msg != null){ System.out.println("msg="+msg); collector.emit(new Values(msg + "msg is processed!")); } } catch (Exception e) { e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("info")); }}
pom.xml
4.0.0 strom-zgl storm-zgl 0.0.1-SNAPSHOT jar storm-zgl http://maven.apache.org UTF-8 junit junit 3.8.1 test org.apache.storm storm-core 0.9.1-incubating
"storm的本地模式demo怎么实现"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
数据
节点
模式
发射
处理
很大
作用
内容
字段
数据处理
方法
更多
用处
知识
集群
分配
复杂
实用
学有所成
接下来
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
gps最佳数据库
鼎天云宏网络技术定位是真的吗
网络安全的模拟器怎么下载
运维跪拜服务器图片
dns服务器在哪找到
温州公司软件开发
招聘惠州数据库
上海蜗牛网络技术有限公司
vb软件开发短信
现在软件开发主流编译器
数据库双向审计是什么意思
查询数据库语句失败的原因
网络安全拓扑图
边缘日记软件开发
mc1.10.2服务器
郧西软件开发市场
赌博棋牌软件开发成本
打印机服务器控制打印权限
徐州房居家网络技术公司
服务器加显卡开机黑屏
武汉千途创造软件开发
关于数据库技术的应用举例
亿利达前台数据库断开
软件开发 框架方案
软件开发要写代码吗
jsp数据库插入不了
软件开发 给公司建议
网络安全杂志有哪些内容
服务器故障检测报告
个人电脑能不能做数据库服务器