千家信息网

Storm如何接收数据

发表于:2024-12-13 作者:千家信息网编辑
千家信息网最后更新 2024年12月13日,这篇文章主要讲解了"Storm如何接收数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Storm如何接收数据"吧!简要的模拟如何接收数据:packa
千家信息网最后更新 2024年12月13日Storm如何接收数据

这篇文章主要讲解了"Storm如何接收数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Storm如何接收数据"吧!

简要的模拟如何接收数据:

package com.cc.storm.spout;import java.io.IOException;import java.util.Map;import java.util.Random;import java.util.concurrent.LinkedBlockingQueue;import org.apache.log4j.Logger;import redis.clients.jedis.JedisPubSub;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;public class RandomEmitSpout extends BaseRichSpout {        private Random _random;        private static final long serialVersionUID = 4092527421163270357L;        static Logger LOG = Logger.getLogger(RandomEmitSpout.class);        private SpoutOutputCollector _collector;        @Override        public void open(Map conf, TopologyContext context,                        SpoutOutputCollector collector) {                _collector = collector;                _random = new Random();        }        @Override        public void nextTuple() {                try {                        Thread.sleep(1000);                } catch (Exception e) {                        e.printStackTrace();                }                String[] userIds = { "1", "2", "3", "4" };                String[] merchandiseIDS = { "1" };                _collector.emit(new Values(userIds[_random.nextInt(userIds.length)],                                merchandiseIDS[_random.nextInt(merchandiseIDS.length)]));        }        @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {                // TODO Auto-generated method stub                declarer.declare(new Fields("userIdS", "merchandiseIDS"));        }        @Override        public void close() {        }}

plus: 如果您采用的是Redis

那么:

package com.cc.storm.spout;import java.util.Map;import java.util.concurrent.LinkedBlockingQueue;import org.apache.log4j.Logger;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.JedisPoolConfig;import redis.clients.jedis.JedisPubSub;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;public class RedisPubSubSpout extends BaseRichSpout {        /**         * @Fields serialVersionUID : TODO         */        private static final long serialVersionUID = 4092527421163270357L;        static Logger LOG = Logger.getLogger(RedisPubSubSpout.class);        private SpoutOutputCollector _collector;        private final String host;        private final int port;        private final String pattern;        LinkedBlockingQueue queue;        JedisPool pool;        public RedisPubSubSpout(String host, int port, String pattern) {                // TODO Auto-generated constructor stub                this.host = host;                this.port = port;                this.pattern = pattern;        }        // 监听线程,从redis订阅的兴趣事件中获取数据        class ListenerThread extends Thread {                private LinkedBlockingQueue queue;                JedisPool pool;                String pattern;                public ListenerThread(LinkedBlockingQueue queue,                                JedisPool pool, String pattern) {                        // TODO Auto-generated constructor stub                        this.queue = queue;                        this.pool = pool;                        this.pattern = pattern;                }                @Override                public void run() {                        JedisPubSub listener = new JedisPubSub() {                                @Override                                public void onUnsubscribe(String arg0, int arg1) {                                        // TODO Auto-generated method stub                                }                                @Override                                public void onSubscribe(String arg0, int arg1) {                                        // TODO Auto-generated method stub                                }                                @Override                                public void onPUnsubscribe(String arg0, int arg1) {                                        // TODO Auto-generated method stub                                }                                @Override                                public void onPSubscribe(String arg0, int arg1) {                                        // TODO Auto-generated method stub                                }                                @Override                                public void onPMessage(String pattern, String channel,                                                String message) {                                        // TODO Auto-generated method stub                                        queue.offer(message);                                }                                @Override                                public void onMessage(String channel, String message) {                                        // TODO Auto-generated method stub                                        queue.offer(message);                                }                        };                        Jedis jedis = pool.getResource();                        try {                                jedis.psubscribe(listener, pattern);                        } finally {                                pool.returnResource(jedis);                        }                }        }        @SuppressWarnings("rawtypes")        @Override        public void open(Map conf, TopologyContext context,                        SpoutOutputCollector collector) {                // TODO Auto-generated method stub                _collector = collector;                // 队列最大支持1000个                queue = new LinkedBlockingQueue(1000);                JedisPoolConfig config = new JedisPoolConfig();                // error                pool = null;                ListenerThread listener = new ListenerThread(queue, pool, pattern);                // 启动线程                listener.start();        }        @Override        public void nextTuple() {                // TODO Auto-generated method stub                String ret = queue.poll();                if (null == ret) {                        // 如果队列中暂无数据可取,休息500ms                        Utils.sleep(500);                } else {                        // 数据格式为 "userID:merchandiseID",可以依据需求更改此处                        String[] s = ret.split(":");                        _collector.emit(new Values(s[0], s[1]));                }        }        @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {                // TODO Auto-generated method stub                declarer.declare(new Fields("userIdS", "merchandiseIDS"));        }        @Override        public void close() {                // TODO Auto-generated method stub                pool.destroy();        }}

感谢各位的阅读,以上就是"Storm如何接收数据"的内容了,经过本文的学习后,相信大家对Storm如何接收数据这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0