Storm如何接收数据
发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,这篇文章主要讲解了"Storm如何接收数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Storm如何接收数据"吧!简要的模拟如何接收数据:packa
千家信息网最后更新 2025年02月06日Storm如何接收数据
这篇文章主要讲解了"Storm如何接收数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Storm如何接收数据"吧!
简要的模拟如何接收数据:
package com.cc.storm.spout;import java.io.IOException;import java.util.Map;import java.util.Random;import java.util.concurrent.LinkedBlockingQueue;import org.apache.log4j.Logger;import redis.clients.jedis.JedisPubSub;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;public class RandomEmitSpout extends BaseRichSpout { private Random _random; private static final long serialVersionUID = 4092527421163270357L; static Logger LOG = Logger.getLogger(RandomEmitSpout.class); private SpoutOutputCollector _collector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _random = new Random(); } @Override public void nextTuple() { try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } String[] userIds = { "1", "2", "3", "4" }; String[] merchandiseIDS = { "1" }; _collector.emit(new Values(userIds[_random.nextInt(userIds.length)], merchandiseIDS[_random.nextInt(merchandiseIDS.length)])); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("userIdS", "merchandiseIDS")); } @Override public void close() { }}
plus: 如果您采用的是Redis
那么:
package com.cc.storm.spout;import java.util.Map;import java.util.concurrent.LinkedBlockingQueue;import org.apache.log4j.Logger;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.JedisPoolConfig;import redis.clients.jedis.JedisPubSub;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;public class RedisPubSubSpout extends BaseRichSpout { /** * @Fields serialVersionUID : TODO */ private static final long serialVersionUID = 4092527421163270357L; static Logger LOG = Logger.getLogger(RedisPubSubSpout.class); private SpoutOutputCollector _collector; private final String host; private final int port; private final String pattern; LinkedBlockingQueuequeue; JedisPool pool; public RedisPubSubSpout(String host, int port, String pattern) { // TODO Auto-generated constructor stub this.host = host; this.port = port; this.pattern = pattern; } // 监听线程,从redis订阅的兴趣事件中获取数据 class ListenerThread extends Thread { private LinkedBlockingQueue queue; JedisPool pool; String pattern; public ListenerThread(LinkedBlockingQueue queue, JedisPool pool, String pattern) { // TODO Auto-generated constructor stub this.queue = queue; this.pool = pool; this.pattern = pattern; } @Override public void run() { JedisPubSub listener = new JedisPubSub() { @Override public void onUnsubscribe(String arg0, int arg1) { // TODO Auto-generated method stub } @Override public void onSubscribe(String arg0, int arg1) { // TODO Auto-generated method stub } @Override public void onPUnsubscribe(String arg0, int arg1) { // TODO Auto-generated method stub } @Override public void onPSubscribe(String arg0, int arg1) { // TODO Auto-generated method stub } @Override public void onPMessage(String pattern, String channel, String message) { // TODO Auto-generated method stub queue.offer(message); } @Override public void onMessage(String channel, String message) { // TODO Auto-generated method stub queue.offer(message); } }; Jedis jedis = pool.getResource(); try { jedis.psubscribe(listener, pattern); } finally { pool.returnResource(jedis); } } } @SuppressWarnings("rawtypes") @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { // TODO Auto-generated method stub _collector = collector; // 队列最大支持1000个 queue = new LinkedBlockingQueue (1000); JedisPoolConfig config = new JedisPoolConfig(); // error pool = null; ListenerThread listener = new ListenerThread(queue, pool, pattern); // 启动线程 listener.start(); } @Override public void nextTuple() { // TODO Auto-generated method stub String ret = queue.poll(); if (null == ret) { // 如果队列中暂无数据可取,休息500ms Utils.sleep(500); } else { // 数据格式为 "userID:merchandiseID",可以依据需求更改此处 String[] s = ret.split(":"); _collector.emit(new Values(s[0], s[1])); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("userIdS", "merchandiseIDS")); } @Override public void close() { // TODO Auto-generated method stub pool.destroy(); }}
感谢各位的阅读,以上就是"Storm如何接收数据"的内容了,经过本文的学习后,相信大家对Storm如何接收数据这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
学习
内容
线程
队列
最大
事件
兴趣
就是
思路
情况
文章
更多
格式
知识
知识点
简要
篇文章
跟着
问题
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
AB服务器图片
worknc怎么不能启动服务器
excel使用数据库
国内数据库技术发展史
软件开发干不下去了该怎么办
优学帮服务器出错了
云计算网络技术工程师招聘
数据库组织机构
金太阳考试服务器
代码实现数据库导入excel
liunx系统 数据库安装
招聘网络技术运营
数据库返回字符串长度包括符号吗
中国移动服务器机柜
国家网络安全的感悟
Mongodb服务器老是挂
服务器作品欣赏
校园网服务器连接电脑
网络技术员如何开展工作
数据库与数据仓库需求分析的不同
服务器电源分配器怎么启动
找回原来的网络服务器
中国农业大学数据库试卷
提升网络安全主动防御能力
关于网络安全的调查问卷怎么做
网络安全法军事案例
如何变管理为服务器
幼儿园网络安全文明上网展板
创业大赛软件开发财务数据
树莓派服务器搭配