如何进行Twitter Storm Stream Grouping编写自定义分组实现
本篇文章为大家展示了如何进行Twitter Storm Stream Grouping编写自定义分组实现,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
##自定义Grouping测试
Storm是支持自定义分组的,本篇文章就是探究Storm如何编写一个自定义分组器,以及对Storm分组器如何分组数据的理解。
这是我写的一个自定义分组,总是把数据分到第一个Task:
public class MyFirstStreamGrouping implements CustomStreamGrouping { private static Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping.class); private Listtasks; @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) { this.tasks = targetTasks; log.info(tasks.toString()); } @Override public List chooseTasks(int taskId, List
从上面的代码可以看出,该自定义分组会把数据归并到第一个TaskArrays.asList(tasks.get(0));
,也就是数据到达后总是被派发到第一组。
测试代码:
TopologyBuilder builder = new TopologyBuilder();builder.setSpout("words", new TestWordSpout(), 2); //自定义分组,builder.setBolt("exclaim1", new DefaultStringBolt(), 3) .customGrouping("words", new MyFirstStreamGrouping());
和之前的测试用例一样,Spout总是发送new String[] {"nathan", "mike", "jackson", "golda", "bertels"}
列表的字符串。我们运行验证一下:
11878 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson11943 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [nathan]11944 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan11979 [Thread-29-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]11980 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike12045 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]12045 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson12080 [Thread-29-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]12081 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson12145 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]12146 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
从这个运行日志我们可以看出,数据总是派发到一个Blot:Thread-25-exclaim1。因为我时本地测试,Thread-25-exclaim1是线程名。而派发的线程是数据多个线程的。因此该测试符合预期,即总是发送到一个Task,并且这个Task也是第一个。
##理解自定义分组实现
自己实现一个自定义分组难吗?其实如果你理解了Hadoop的Partitioner,Storm的CustomStreamGrouping和它也是一样的道理。
Hadoop MapReduce的Map完成后会把Map的中间结果写入磁盘,在写磁盘前,线程首先根据数据最终要传送到的Reducer把数据划分成相应的分区,然后不同的分区进入不同的Reduce。我们先来看看Hadoop是怎样把数据怎样分组的,这是Partitioner唯一一个方法:
public class Partitioner{ @Override public int getPartition(K key, V value, int numReduceTasks) { return 0; }}
上面的代码中:Map输出的数据都会经过getPartition()方法,用来确定下一步的分组。numReduceTasks是一个Job的Reduce数量,而返回值就是确定该条数据进入哪个Reduce。返回值必须大于等于0,小于numReduceTasks,否则就会报错。返回0就意味着这条数据进入第一个Reduce。对于随机分组来说,这个方法可以这么实现:
public int getPartition(K key, V value, int numReduceTasks) { return hash(key) % numReduceTasks;}
其实Hadoop 默认的Hash分组策略也正是这么实现的。这样好处是,数据在整个集群基本上是负载平衡的。
搞通了Hadoop的Partitioner,我们来看看Storm的CustomStreamGrouping。
这是CustomStreamGrouping类的源码:
public interface CustomStreamGrouping extends Serializable { void prepare(WorkerTopologyContext context, GlobalStreamId stream, ListtargetTasks); List chooseTasks(int taskId, List
一模一样的道理,targetTasks就是Storm运行时告诉你,当前有几个目标Task可以选择,每一个都给编上了数字编号。而 chooseTasks(int taskId, List
就是让你选择,你的这条数据values,是要哪几个目标Task处理?
如上文文章开头的自定义分组器实现的代码,我选择的总是让第一个Task来处理数据, return Arrays.asList(tasks.get(0));
。和Hadoop不同的是,Storm允许一条数据被多个Task处理,因此返回值是List
上述内容就是如何进行Twitter Storm Stream Grouping编写自定义分组实现,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注行业资讯频道。