千家信息网

Storm排序怎么实现

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,这篇文章主要讲解了"Storm排序怎么实现",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Storm排序怎么实现"吧!阅读背景:1 : 您需要对滑动窗口
千家信息网最后更新 2025年02月03日Storm排序怎么实现

这篇文章主要讲解了"Storm排序怎么实现",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Storm排序怎么实现"吧!

阅读背景:

1 : 您需要对滑动窗口要初步了解

2 : 您需要了解滑动窗口在滑动的过程之中,滑动chunk的计算过程,尤其是每发射一次,就需要清空一次。

package com.cc.storm.bolt;import java.util.HashMap;import java.util.HashSet;import java.util.Map;import java.util.Set;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;/** * 1 在这里我们需要去实现一个滑动窗口,请注意,在我们实现滑动窗口的过程之中清空的是当前滑动窗口的下一个 *  *  *  * @author Yin Shuai *  */public class RollingCountBolt implements IRichBolt {        private static final long serialVersionUID = 1765379339552134320L;        private HashMap _objectCounts = new HashMap();        private int _numBuckets;        private transient Thread cleaner;        private OutputCollector _collector;        /**         * _trackMinute         * 是我们整个滑动窗口的大小,滑动窗口的大小,本质上决定了我们的时间区间,也就是说,假设我们目前滑动窗口的总体大小为15分钟。         * 那我们的商品点击的实时排序的指标值,好比商品浏览量的计算值,也就是15分钟         *          * 而单个窗口的大小也就是我,我们这个三十分钟在随着时间不断的在推移         *          * 举例说明:在最初的构造过程之中,如果我们的桶的数目为10,那么单个窗口的时间长度为3.         *          * [0,30],[3,33],[6,36],[9,39],[12,42] 统计的数值处在不断的变化之中         *          */        private int _trackMinutes;        public RollingCountBolt(int numBuckets, int trackMinutes) {                this._numBuckets = numBuckets;                this._trackMinutes = trackMinutes;        }        public long totalObjects(Object obj) {                long[] curr = _objectCounts.get(obj);                long total = 0;                for (long l : curr) {                        total += l;                }                return total;        }        public int currentBucket(int buckets) {                return currentSecond() / secondsPerBucket(buckets) % buckets;        }        public int currentSecond() {                return (int) (System.currentTimeMillis() / 1000);        }        /**         *          * @param buckets         *            你设定的桶的数量         * @return 依据我们默认的_trackMinutes / buckets 得到每一个桶的数量         */        public int secondsPerBucket(int buckets) {                return _trackMinutes * 60 / buckets;        }        public long millisPerBucket(int buckets) {                return (long) 1000 * secondsPerBucket(buckets);        }        @SuppressWarnings("rawtypes")        @Override        public void prepare(Map stormConf, TopologyContext context,                        OutputCollector collector) {                // TODO Auto-generated method stub                _collector = collector;                cleaner = new Thread(new Runnable() {                        @SuppressWarnings("unchecked")                        @Override                        public void run() {                                // TODO Auto-generated method stub                                int lastBucket = currentBucket(_numBuckets);                                while (true) {                                        int currBucket = currentBucket(_numBuckets);                                        p("线程while循环: 当前的桶为:" + currBucket);                                        if (currBucket != lastBucket) {                                                p("线程while循环:之前的桶数为:" + lastBucket);                                                int bucketToWipe = (currBucket + 1) % _numBuckets;                                                p("线程while循环:要擦除掉的桶为:" + bucketToWipe);                                                synchronized (_objectCounts) {                                                        Set objs = new HashSet(_objectCounts.keySet());                                                        for (Object obj : objs) {                                                                long[] counts = _objectCounts.get(obj);                                                                long currBucketVal = counts[bucketToWipe];                                                                p("线程while循环:擦除掉的值为:" + currBucketVal);                                                                counts[bucketToWipe] = 0;                                                                long total = totalObjects(obj);                                                                if (currBucketVal != 0) {                                                                        p("线程while循环:擦除掉的值为不为0:那就发射数据:obj total"                                                                                        + obj + ":" + total);                                                                        _collector.emit(new Values(obj, total));                                                                }                                                                if (total == 0) {                                                                        p("线程while循环: 总数为0以后,将obj对象删除");                                                                        _objectCounts.remove(obj);                                                                }                                                        }                                                }                                                lastBucket = currBucket;                                        }                                        long delta = millisPerBucket(_numBuckets)                                                        - (System.currentTimeMillis() % millisPerBucket(_numBuckets));                                        Utils.sleep(delta);                                        p("\n");                                }                        }                });                cleaner.start();        }        @Override        public void execute(Tuple input) {                Object obj1 = input.getValue(0);                Object obj = input.getValue(1);                int currentBucket = currentBucket(_numBuckets);                p("execute方法:当前桶:bucket: " + currentBucket);                synchronized (_objectCounts) {                        long[] curr = _objectCounts.get(obj);                        if (curr == null) {                                curr = new long[_numBuckets];                                _objectCounts.put(obj, curr);                        }                        curr[currentBucket]++;                        System.err                                        .print(("execute方法:接受到的merchandiseIDS:" + obj.toString() + ",long数组:"));                        for (long number : curr) {                                System.err.print(number + ":");                        }                        p("execute方法:发射的数据: " + obj + ":" + totalObjects(obj));                        /**                         * 我们不断的发射的也就是我们某一个商品id,在当前滑动窗口,也就是我们的时间周期内的指标计算值                         * 要注意,在排序的过程之中,我们只针对key, 也就是我们的商品id,由此发射给后续的排序bolt依据包含了时间区间的信息                         */                        // 每来一条数据,就会发射一次                        _collector.emit(new Values(obj, totalObjects(obj)));                        _collector.ack(input);                }                p("\n");        }        @Override        public void cleanup() {                // TODO Auto-generated method stub        }        @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {                // TODO Auto-generated method stub                declarer.declare(new Fields("merchandiseID", "count"));        }        @Override        public Map getComponentConfiguration() {                // TODO Auto-generated method stub                return null;        }        public void p(Object o) {                System.err.println(o.toString());        }}

在这里,最需要我们关注的地方是,滑动窗口每滑动一次,将情况一组数据。 而发射数据的过程之中将统计这一组数

据。

感谢各位的阅读,以上就是"Storm排序怎么实现"的内容了,经过本文的学习后,相信大家对Storm排序怎么实现这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0