千家信息网

storm中如何自定义数据分组

发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,今天就跟大家聊聊有关storm中如何自定义数据分组,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。数据流组设计一个拓扑时,你要做的最重要的事情之
千家信息网最后更新 2025年01月31日storm中如何自定义数据分组

今天就跟大家聊聊有关storm中如何自定义数据分组,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

数据流组

设计一个拓扑时,你要做的最重要的事情之一就是定义如何在各组件之间交换数据(数据流是如何被bolts消费的)。一个数据流组指定了每个bolt会消费哪些数据流,以及如何消费它们。

storm自带数据流组

随机数据流组

随机流组是最常用的数据流组。它只有一个参数(数据源组件),并且数据源会向随机选择的bolt发送元组,保证每个消费者收到近似数量的元组。

 builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");

域数据流组

域数据流组允许你基于元组的一个或多个域控制如何把元组发送给bolts。它保证拥有相同域组合的值集发送给同一个bolt。回到单词计数器的例子,如果你用word域为数据流分组,word-normalizer bolt将只会把相同单词的元组发送给同一个word-counterbolt实例。

 builder.setBolt("word-counter", new WordCounter(),2)           .fieldsGrouping("word-normalizer", new Fields("word"));

全部数据流组

全部数据流组,为每个接收数据的实例复制一份元组副本。这种分组方式用于向bolts发送信号。比如,你要刷新缓存,你可以向所有的bolts发送一个刷新缓存信号。在单词计数器的例子里,你可以使用一个全部数据流组,添加清除计数器缓存的功能

builder.setBolt("word-counter", new WordCounter(),2)           .fieldsGroupint("word-normalizer",new Fields("word"))           .allGrouping("signals-spout","signals");

直接数据流组

这是一个特殊的数据流组,数据源可以用它决定哪个组件接收元组

 builder.setBolt("word-counter", new WordCounter(),2)           .directGrouping("word-normalizer");

。与前面的例子类似,数据源将根据单词首字母决定由哪个bolt接收元组。要使用直接数据流组,在WordNormalizer bolt中,使用emitDirect方法代替emit。

public void execute(Tuple input) {        ...        for(String word : words){            if(!word.isEmpty()){                ...                collector.emitDirect(getWordCountIndex(word),new Values(word));            }        }        //对元组做出应答        collector.ack(input);    }    public Integer getWordCountIndex(String word) {        word = word.trim().toUpperCase();        if(word.isEmpty()){            return 0;        }else{            return word.charAt(0) % numCounterTasks;        }    }

在prepare方法中计算任务数

 public void prepare(Map stormConf, TopologyContext context,                 OutputCollector collector) {        this.collector = collector;        this.numCounterTasks = context.getComponentTasks("word-counter");    }

全局数据流组

全局数据流组把所有数据源创建的元组发送给单一目标实例(即拥有最低ID的任务)。

不分组

这个数据流组相当于随机数据流组。也就是说,使用这个数据流组时,并不关心数据流是如何分组的。

自定义数据流组

storm自定义数据流组和hadoop Partitioner分组很相似,storm自定义分组要实现CustomStreamGrouping接口,接口源码如下:

public interface CustomStreamGrouping extends Serializable { void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks); List chooseTasks( int taskId, List values); }

targetTasks就是Storm运行时告诉你,当前有几个目标Task可以选择,每一个都给编上了数字编号。而 chooseTasks(int taskId, List values); 就是让你选择,你的这条数据values,是要哪几个目标Task处理?

这是我写的一个自定义分组,总是把数据分到第一个Task:

public class MyFirstStreamGrouping implements CustomStreamGrouping { private static Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping. class ); private List tasks; @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) { this .tasks = targetTasks; log.info(tasks.toString()); } @Override public List chooseTasks( int taskId, List values) { log.info(values.toString()); return Arrays.asList(tasks.get( 0 )); } }

从上面的代码可以看出,该自定义分组会把数据归并到第一个TaskArrays.asList(tasks.get(0));,也就是数据到达后总是被派发到第一组。和Hadoop不同的是,Storm允许一条数据被多个Task处理,因此返回值是List .就是让你来在提供的 'List targetTasks' Task中选择任意的几个(必须至少是一个)Task来处理数据。

第二个自定义分组,wordcount中使首字母相同的单词交给同一个bolt处理:

public class ModuleGrouping implements CustormStreamGrouping{        int numTasks = 0;        @Override        public List chooseTasks(List values) {            List boltIds = new ArrayList();            if(values.size()>0){                String str = values.get(0).toString();                if(str.isEmpty()){                    boltIds.add(0);                }else{                    boltIds.add(str.charAt(0) % numTasks);                }            }            return boltIds;        }        @Override        public void prepare(TopologyContext context, Fields outFields, List targetTasks) {            numTasks = targetTasks.size();        }    }

这是一个CustomStreamGrouping的简单实现,在这里我们采用单词首字母字符的整数值与任务数的余数,决定接收元组的bolt。

builder.setBolt("word-normalizer", new WordNormalizer())           .customGrouping("word-reader", new ModuleGrouping());

看完上述内容,你们对storm中如何自定义数据分组有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

0