千家信息网

storm中trident是什么

发表于:2024-10-14 作者:千家信息网编辑
千家信息网最后更新 2024年10月14日,这篇文章主要介绍storm中trident是什么,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!简介Storm是一个实时流计算框架,Trident是对storm的一个更高层次的抽
千家信息网最后更新 2024年10月14日storm中trident是什么

这篇文章主要介绍storm中trident是什么,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

简介

Storm是一个实时流计算框架,Trident是对storm的一个更高层次的抽象,Trident最大的特点以batch的形式处理stream。

一些最基本的操作函数有Filter、Function,Filter可以过滤掉tuple,Function可以修改tuple内容,输出0或多个tuple,并能把新增的字段追加到tuple后面。

聚合有partitionAggregate和Aggregator接口。partitionAggregate对当前partition中的tuple进行聚合,它不是重定向操作。Aggregator有三个接口:CombinerAggregator, ReducerAggregator,Aggregator,它们属于重定向操作,它们会把stream重定向到一个partition中进行聚合操作。

重定向操作会改变数据流向,但不会改变数据内容,重定向操会产生网络传输,可能影响一部分效率。而Filter、Function、partitionAggregate则属于本地操作,不会产生网络传输。

GroupBy会根据指定字段,把整个stream切分成一个个grouped stream,如果在grouped stream上做聚合操作,那么聚合就会发生在这些grouped stream上而不是整个batch。如果groupBy后面跟的是aggregator,则是聚合操作,如果跟的是partitionAggregate,则不是聚合操作。

Trident主要有5类操作:

1、作用在本地的操作,不产生网络传输。

2、对数据流的重分布,不改变流的内容,但是产生网络传输。

3、聚合操作,有可能产生网络传输。

4、作用在分组流(grouped streams)上的操作。

5、Merge和join

partition

概念

partition中文意思是分区,有人将partition理解为Storm里面的task,即并发的基本执行单位。我理解应该是像数据库里面的分区,是将一个batch的数据分区,分成多个partition,或者可以理解为多个子batch,然后多个partition可以并发处理。这里关键的区别是:partition是数据,不是执行的代码。你把数据(tuple)分区以后,如果你没有多个task(并发度)来处理这些分区后的数据,那分区也是没有作用的。所以这里的关系是这样的:先有batch,因为Trident内部是基于batch来实现的;然后有partition;分区后再分配并发度,然后才能进行并发处理。并发度的分配是利用parallelismHint来实现的。

操作

既然有partition的概念,那么也就有partition的操作。Trident提供的分区操作,类似于Storm里面讲的grouping。分区操作有:

重分区操作通过运行一个函数改变元组在任务之间的分布,也可以调整分区的数量(比如重分区之后将并行度调大),重分区需要网络传输的参与。重分区函数包含以下这几个:

  1. shuffle:使用随机轮询算法在所有目标分区间均匀分配元组;

  2. broadcast:每个元组复制到所有的目标分区。这在DRPC中非常有用,例如,需要对每个分区的数据做一个stateQuery操作;

  3. partitionBy:接收一些输入字段,根据这些字段输入字段进行语义分区。通过对字段取hash值或者取模来选择目标分区。partitionBy保证相同的字段一定被分配到相同的目标分区;

  4. global:所有的元组分配到相同的分区,该分区是流种所有batch决定的;

  5. batchGlobal:同一个batch中的元组被分配到相同的目标分区,不同batch的元组有可能被分配到不同的目标分区;

  6. partition:接收一个自定义的分区函数,自定义分区函数需要实现backtype.storm.grouping.CustomStreamGrouping接口。

注意,除了这里明确提出来的分区操作,Trident里面还有aggregate()函数隐含有分区的操作,它用的是global()操作,这个在后面接收聚合操作的时候还会再介绍。

API

each() 方法

作用:操作batch中的每一个tuple内容,一般与Filter或者Function函数配合使用。

下面通过一个例子来介绍each()方法,假设我们有一个FakeTweetsBatchSpout,它会模拟一个Stream,随机产生一个个消息。我们可以通过设置这个Spout类的构造参数来改变这个Spout的batch Size的大小。

1.Filter类:过滤tuple

一个通过actor字段过滤消息的Filter:

public static class PerActorTweetsFilter extends BaseFilter {  String actor;  public PerActorTweetsFilter(String actor) {    this.actor = actor;  }  @Override  public boolean isKeep(TridentTuple tuple) {    return tuple.getString(0).equals(actor);  }}

Topology:

topology.newStream("spout", spout)  .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))  .each(new Fields("actor", "text"), new Utils.PrintFilter());

从上面例子看到,each()方法有一些构造参数

  • 第一个构造参数:作为Field Selector,一个tuple可能有很多字段,通过设置Field,我们可以隐藏其它字段,仅仅接收指定的字段(其它字段实际还在)。

  • 第二个是一个Filter:用来过滤掉除actor名叫"dave"外的其它消息。

2.Function类:加工处理tuple内容

一个能把tuple中text内容变成大写的Function:

public static class UppercaseFunction extends BaseFunction {  @Override  public void execute(TridentTuple tuple, TridentCollector collector) {    collector.emit(new Values(tuple.getString(0).toUpperCase()));  }}

Topology:

topology.newStream("spout", spout)  .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))  .each(new Fields("text", "actor"), new UppercaseFunction(), new Fields("uppercased_text"))  .each(new Fields("actor", "text", "uppercased_text"), new Utils.PrintFilter());

首先,UppercaseFunction函数的输入是Fields("text", "actor"),其作用是把其中的"text"字段内容都变成大写。

其次,它比Filter多出一个输出字段,作用是每个tuple在经过这个Function函数处理后,输出字段都会被追加到tuple后面,在本例中,执行完Function之后的tuple内容多了一个"uppercased_text",并且这个字段排在最后面。

3. Field Selector与project

我们需要注意的是,上面每个each()方法的第一个Field字段仅仅是隐藏掉没有指定的字段内容,实际上被隐藏的字段依然还在tuple中,如果想要彻底丢掉它们,我们就需要用到project()方法。

投影操作作用是仅保留Stream指定字段的数据,比如有一个Stream包含如下字段: ["a", "b", "c", "d"],运行如下代码:

mystream.project(new Fields("b", "d"))

则输出的流仅包含 ["b", "d"]字段。

aggregation的介绍

首先聚合操作分两种:partitionAggregate(),以及aggregate()。

1.partitionAggregate

partitionAggregate()的操作是在partition上,一个batch的tuple被分成多个partition后,每个partition都会单独运行partitionAggregate中指定的聚合操作。分区聚合在一批tuple的每一个分区上运行一个函数。与函数不同的是,分区聚合的输出元组会覆盖掉输入元组。请看如下示例:

mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))

假设你有一个包含a,b两个字段的输入流,元组的分区情况如下:

Partition 0:["a", 1]["b", 2]Partition 1:["a", 3]["c", 8]Partition 2:["e", 1]["d", 9]["d", 10]

运行上面的那一行代码将会输出如下的元组,这些元组只包含一个sum字段:

Partition 0:[3]Partition 1:[11]Partition 2:[20]

2.aggregate

aggregate()隐含了一个global分区操作,也就是它做的是全局聚合操作。它针对的是整个batch的聚合计算。

这两种聚合操作,都可以传入不同的aggregator实现具体的聚合任务。Trident中有三种aggregator接口,分别为:ReducerAggregator,CombinerAggregator,Aggregator。

下面是CombinerAggregator接口的定义:

public interface CombinerAggregator extends Serializable {    T init(TridentTuple tuple);    T combine(T val1, T val2);    T zero();}

CombinerAggregator返回只有一个字段的一个元组。CombinerAggregator在每个输入元组上运行init函数,然后通过combine函数聚合结果值直到只剩下一个元组。如果分区中没有任何元组,CombinerAggregator将返回zero函数中定义的元组。比如,下面是Count聚合器的实现:

public class Count implements CombinerAggregator {    public Long init(TridentTuple tuple) {        return 1L;    }    public Long combine(Long val1, Long val2) {        return val1 + val2;    }    public Long zero() {        return 0L;    }}

ReducerAggregator接口的定义如下:

public interface ReducerAggregator extends Serializable {    T init();    T reduce(T curr, TridentTuple tuple);}

ReducerAggregator通过init函数得到一个初始的值,然后对每个输入元组调用reduce方法计算值,产生一个元组作为输出。比如Count的ReducerAggregator实现如下:

public class Count implements ReducerAggregator {    public Long init() {        return 0L;    }    public Long reduce(Long curr, TridentTuple tuple) {        return curr + 1;    }}

最常用的聚合器的接口是Aggregator,它的定义如下:

public interface Aggregator extends Operation {    T init(Object batchId, TridentCollector collector);    void aggregate(T state, TridentTuple tuple, TridentCollector collector);    void complete(T state, TridentCollector collector);}

Aggregator能够发射任意数量,任意字段的元组。并且可以在执行期间的任何时候发射元组,它的执行流程如下:

  1. 处理batch之前调用init方法,init函数的返回值是一个表示聚合状态的对象,该对象会传递到aggregate和complete函数;

  2. 每个在batch分区中的元组都会调用aggregate方法,该方法能够更新聚合状态并且发射元组;

  3. 当batch分区中的所有元组都被aggregate函数处理完时调用complete函数。

下面是使用Aggregator接口实现的Count聚合器:

public class CountAgg extends BaseAggregator {    static class CountState {        long count = 0;    }    public CountState init(Object batchId, TridentCollector collector) {        return new CountState();    }    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {        state.count+=1;    }    public void complete(CountState state, TridentCollector collector) {        collector.emit(new Values(state.count));    }}

有些时候,我们需要通知执行很多个聚合器,则可以使用如下的链式调用执行:

mystream.chainedAgg()        .partitionAggregate(new Count(), new Fields("count"))        .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))        .chainEnd()

上面的代码将会在每一个分区执行Count和Sum聚合器,输出结果是包含count和sum两个字段的元组。

最重要的区别是CombinerAggregator,它是先在partition上做partial aggregate,然后再将这些部分聚合结果通过global分区到一个总的分区,在这个总的分区上对结果进行汇总。

groupBy()分组操作

首先它包含两个操作,一个是分区操作,一个是分组操作。

如果后面是partitionAggregate()的话,就只有分组操作:在每个partition上分组,分完组后,在每个分组上进行聚合;

如果后面是aggregate()的话,先根据partitionBy分区,在每个partition上分组,,分完组后,在每个分组上进行聚合。

parallelismHint并发度的介绍

它设置它前面所有操作的并发度,直到遇到某个repartition操作为止。

topology.newStream("spout", spout)      .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))      .parallelismHint(5)      .each(new Fields("actor", "text"), new Utils.PrintFilter());

意味着:parallelismHit之前的spout,each都是5个相同的操作一起并发,对,一共有5个spout同时发射数据,其实parallelismHint后面的each操作,也是5个并发。分区操作是作为Bolt划分的分界点的。

如果想单独设置Spout怎么办?要在Spout之后,Bolt之前增加一个ParallelismHint,并且还要增加一个分区操作:

topology.newStream("spout", spout)          .parallelismHint(2)          .shuffle()          .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))          .parallelismHint(5)          .each(new Fields("actor", "text"), new Utils.PrintFilter());

很多人只是设置了Spout的并发度,而没有调用分区操作,这样是达不到效果的,因为Trident是不会自动进行分区操作的。像我之前介绍的,先分区,再设置并发度。如果Spout不设置并发度,只设置shuffle,默认是1个并发度,这样后面设置5个并发度不会影响到Spout,因为并发度的影响到shuffle分区操作就停止了。

例子

groupBy+aggregate+parallelismHint

package com.demo;import java.util.HashMap;import java.util.Map;import backtype.storm.tuple.Values;import storm.trident.operation.BaseAggregator;import storm.trident.operation.TridentCollector;import storm.trident.operation.TridentOperationContext;import storm.trident.tuple.TridentTuple;public class MyAgg extends BaseAggregator> {                /**         *          */        private static final long serialVersionUID = 1L;                /**         * 属于哪个分区         */        private int partitionId;        /**         * 分区数量         */        private int numPartitions;        private String batchId;                @SuppressWarnings("rawtypes")        @Override        public void prepare(Map conf, TridentOperationContext context) {                partitionId = context.getPartitionIndex();                numPartitions = context.numPartitions();                        }        public void aggregate(Map val, TridentTuple tuple,                        TridentCollector collector) {                String word = tuple.getString(0);                Integer value = val.get(word);                if (value == null) {                        value = 0;                }                value++;                // 把数据保存到一个map对象中                val.put(word, value);                System.err.println("I am partition [" + partitionId                                + "] and I have kept a tweet by: " + numPartitions + " " + word + " " +batchId);        }        public void complete(Map val, TridentCollector collector) {                collector.emit(new Values(val));        }        public Map init(Object arg0, TridentCollector arg1) {                this.batchId = arg0.toString();                return new HashMap();        }}
            FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,                                new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f"));                spout.setCycle(false);                TridentTopology tridentTopology = new TridentTopology();                tridentTopology                                .newStream("spout", spout)                                .shuffle()                                .groupBy(new Fields("sentence"))                                .aggregate(new Fields("sentence"), new MyAgg(),                                                new Fields("Map"))                            .parallelismHint(2)
I am partition [0] and I have kept a tweet by: 2 a 1:0I am partition [0] and I have kept a tweet by: 2 a 1:0I am partition [0] and I have kept a tweet by: 2 a 2:0I am partition [1] and I have kept a tweet by: 2 d 2:0I am partition [0] and I have kept a tweet by: 2 e 3:0I am partition [1] and I have kept a tweet by: 2 f 3:0

groupBy+partitionAggregate+parallelismHint

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,                                new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f"));                spout.setCycle(false);                TridentTopology tridentTopology = new TridentTopology();                tridentTopology                                .newStream("spout", spout)                                .shuffle()                                .groupBy(new Fields("sentence"))                                .partitionAggregate(new Fields("sentence"), new MyAgg(),                                                new Fields("Map")))                                .toStream()                            .parallelismHint(2)
I am partition [0] and I have kept a tweet by: 2 a 1:0I am partition [1] and I have kept a tweet by: 2 a 1:0I am partition [0] and I have kept a tweet by: 2 a 2:0I am partition [1] and I have kept a tweet by: 2 d 2:0I am partition [0] and I have kept a tweet by: 2 e 3:0I am partition [1] and I have kept a tweet by: 2 f 3:0

由于shuffle已经把tuple平均分配给5个partition了,用groupBy+partitionAggregate来聚合又没有partitionBy分区的作用,所以,直接在5个分区上进行聚合,结果就是每个分区各有一个tuple。

而用groupBy+aggregate,虽然也是shuffle,但是由于具有partitiononBy分区的作用,值相同的tuple都分配到同一个分区,结果就是每个分区根据不同的值来做汇聚。

aggregate+parallelismHint(没有groupBy)

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,                                new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f"));                spout.setCycle(false);                TridentTopology tridentTopology = new TridentTopology();                tridentTopology                                .newStream("spout", spout)                                .shuffle()                                .aggregate(new Fields("sentence"), new MyAgg(),                                                new Fields("Map"))                            .parallelismHint(2)
I am partition [1] and I have kept a tweet by: 2 a 1:0I am partition [1] and I have kept a tweet by: 2 a 1:0I am partition [0] and I have kept a tweet by: 2 a 2:0I am partition [0] and I have kept a tweet by: 2 d 2:0I am partition [1] and I have kept a tweet by: 2 e 3:0I am partition [1] and I have kept a tweet by: 2 f 3:0

partitionAggregate+parallelismHint(没有groupBy操作)

                FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,                                new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f"));                spout.setCycle(false);                TridentTopology tridentTopology = new TridentTopology();                tridentTopology                                .newStream("spout", spout)                                .shuffle()                                .partitionAggregate(new Fields("sentence"), new MyAgg(),                                                new Fields("Map"))                                .toStream()                            .parallelismHint(2)
I am partition [1] and I have kept a tweet by: 2 a 1:0I am partition [0] and I have kept a tweet by: 2 a 1:0I am partition [1] and I have kept a tweet by: 2 a 2:0I am partition [0] and I have kept a tweet by: 2 d 2:0I am partition [0] and I have kept a tweet by: 2 e 3:0I am partition [1] and I have kept a tweet by: 2 f 3:0

我们可以发现,partitionAggregate加上groupBy,或者不加上groupBy,对结果都一样:groupBy对于partitionAggregate没有影响。但是对于aggregate来说,加上groupBy,就不是做全局聚合了,而是对分组做聚合;不加上groupBy,就是做全局聚合。

如果spout设置并行度,但是没有加shuffle,不会起作用,分区默认为1,;如果不设置并行度并且没有加shuffle,分区默认为1。

Merge和Joins

api的最后一部分便是如何把各种流汇聚到一起。最简单的方式就是把这些流汇聚成一个流。我们可以这么做:

topology.merge(stream1, stream2, stream3);

Trident指定新的合并之后的流中的字段为stream1中的字段。
另一种合并流的方式就是join。一个标准的join就像是一个sql,必须有标准的输入,因此,join只针对符合条件的Stream。join应用在来自Spout的每一个小Batch中。

下面的例子中,stream1流包含key,val1,val2三个字段,stream2流包含x,val1两个字段:

topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));

stream1流的key字段与stream2流的x字段组join操作,另外,Trident要求所有新流的输出字段被重命名,因为输入流可能包含相同的字段名称。连接流发射的元组将会包含:

  1. 连接字段的列表。在上面的例子中,字段key对应stream1的key,stream2的x;

  2. 来自所有流的所有非连接字段的列表,按照传递到连接方法的顺序排序。在上面的例子中,字段a与字段b对应stream1的val1和val2,c对应于stream2的val1.

当join的是来源于不同Spout的stream时,这些Spout在发射数据时需要同步,一个Batch所包含的tuple会来自各个Spout。

以上是"storm中trident是什么"这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注行业资讯频道!

0