Storm如何接收数据
发表于:2024-12-13 作者:千家信息网编辑
千家信息网最后更新 2024年12月13日,这篇文章主要讲解了"Storm如何接收数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Storm如何接收数据"吧!简要的模拟如何接收数据:packa
千家信息网最后更新 2024年12月13日Storm如何接收数据
这篇文章主要讲解了"Storm如何接收数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Storm如何接收数据"吧!
简要的模拟如何接收数据:
package com.cc.storm.spout;import java.io.IOException;import java.util.Map;import java.util.Random;import java.util.concurrent.LinkedBlockingQueue;import org.apache.log4j.Logger;import redis.clients.jedis.JedisPubSub;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;public class RandomEmitSpout extends BaseRichSpout { private Random _random; private static final long serialVersionUID = 4092527421163270357L; static Logger LOG = Logger.getLogger(RandomEmitSpout.class); private SpoutOutputCollector _collector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _random = new Random(); } @Override public void nextTuple() { try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } String[] userIds = { "1", "2", "3", "4" }; String[] merchandiseIDS = { "1" }; _collector.emit(new Values(userIds[_random.nextInt(userIds.length)], merchandiseIDS[_random.nextInt(merchandiseIDS.length)])); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("userIdS", "merchandiseIDS")); } @Override public void close() { }}
plus: 如果您采用的是Redis
那么:
package com.cc.storm.spout;import java.util.Map;import java.util.concurrent.LinkedBlockingQueue;import org.apache.log4j.Logger;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.JedisPoolConfig;import redis.clients.jedis.JedisPubSub;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;public class RedisPubSubSpout extends BaseRichSpout { /** * @Fields serialVersionUID : TODO */ private static final long serialVersionUID = 4092527421163270357L; static Logger LOG = Logger.getLogger(RedisPubSubSpout.class); private SpoutOutputCollector _collector; private final String host; private final int port; private final String pattern; LinkedBlockingQueuequeue; JedisPool pool; public RedisPubSubSpout(String host, int port, String pattern) { // TODO Auto-generated constructor stub this.host = host; this.port = port; this.pattern = pattern; } // 监听线程,从redis订阅的兴趣事件中获取数据 class ListenerThread extends Thread { private LinkedBlockingQueue queue; JedisPool pool; String pattern; public ListenerThread(LinkedBlockingQueue queue, JedisPool pool, String pattern) { // TODO Auto-generated constructor stub this.queue = queue; this.pool = pool; this.pattern = pattern; } @Override public void run() { JedisPubSub listener = new JedisPubSub() { @Override public void onUnsubscribe(String arg0, int arg1) { // TODO Auto-generated method stub } @Override public void onSubscribe(String arg0, int arg1) { // TODO Auto-generated method stub } @Override public void onPUnsubscribe(String arg0, int arg1) { // TODO Auto-generated method stub } @Override public void onPSubscribe(String arg0, int arg1) { // TODO Auto-generated method stub } @Override public void onPMessage(String pattern, String channel, String message) { // TODO Auto-generated method stub queue.offer(message); } @Override public void onMessage(String channel, String message) { // TODO Auto-generated method stub queue.offer(message); } }; Jedis jedis = pool.getResource(); try { jedis.psubscribe(listener, pattern); } finally { pool.returnResource(jedis); } } } @SuppressWarnings("rawtypes") @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { // TODO Auto-generated method stub _collector = collector; // 队列最大支持1000个 queue = new LinkedBlockingQueue (1000); JedisPoolConfig config = new JedisPoolConfig(); // error pool = null; ListenerThread listener = new ListenerThread(queue, pool, pattern); // 启动线程 listener.start(); } @Override public void nextTuple() { // TODO Auto-generated method stub String ret = queue.poll(); if (null == ret) { // 如果队列中暂无数据可取,休息500ms Utils.sleep(500); } else { // 数据格式为 "userID:merchandiseID",可以依据需求更改此处 String[] s = ret.split(":"); _collector.emit(new Values(s[0], s[1])); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("userIdS", "merchandiseIDS")); } @Override public void close() { // TODO Auto-generated method stub pool.destroy(); }}
感谢各位的阅读,以上就是"Storm如何接收数据"的内容了,经过本文的学习后,相信大家对Storm如何接收数据这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
学习
内容
线程
队列
最大
事件
兴趣
就是
思路
情况
文章
更多
格式
知识
知识点
简要
篇文章
跟着
问题
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
洛阳市网络安全监察部门
银行贷款客户数据库
linq查询第一行数据库
网络安全靠人蒙文报纸
黑产网络安全
名师讲坛网络安全
江苏球讯网络技术有限公司
线上锁客网络技术
三级数据库考试用哪个软件
w5500如何与服务器通信
火车头采集如何保存尽数据库
长沙全速网络技术有限公司地址
网络安全宪法手抄报
马鞍山电力软件开发多少钱
删除数据库表里的一个字段的数据
网络安全知识微信公众号
校园网络安全课程设计
头条服务器数据保存多久
橘子数据库
服务器根目录管理
中国电科网络安全评论
安徽app软件开发哪家便宜
王者一台服务器多人使用
校园网络安全 标语
服务器管理ip地址是多少
数据库服务器安装失败
关于数据库关联查询的类型
中国人民共国网络安全法
宽带网络技术表
三级计算机网络技术报考条件