天天看点

Storm专题二:Storm Trident API 使用详解

一、概述

     storm trident中的核心数据模型就是“stream”,也就是说,storm trident处理的是stream,但是实际上stream是被成批处理的,stream被切分成一个个的batch分布到集群中,所有应用在stream上的函数最终会应用到每个节点的batch中,实现并行计算,具体如下图所示:

Storm专题二:Storm Trident API 使用详解

在trident中有五种操作类型:

apply locally:本地操作,所有操作应用在本地节点数据上,不会产生网络传输     

repartitioning:数据流重定向,单纯的改变数据流向,不会改变数据内容,这部分会有网络传输

aggragation:聚合操作,会有网络传输

grouped streams上的操作

merge和join

小结:上面提到了trident实际上是通过把函数应用到每个节点的batch上的数据以实现并行,而应用的这些函数就是tridentapi,下面我们就具体介绍一下tridentapi的各种操作。  

二、trident五种操作详解

2.1 apply locally本地操作:操作都应用在本地节点的batch上,不会产生网络传输

2.1.1 functions:函数操作

     函数的作用是接收一个tuple(需指定接收tuple的哪个字段),输出0个或多个tuples。输出的新字段值会被追加到原始输入tuple的后面,如果一个function不输出tuple,那就意味这这个tuple被过滤掉了,下面举例说明:

定义一个function:

     小结:function实际上就是对经过function函的tuple做一些操作以改变其内容。

比如我们处理一个“mystream”的数据流,它有三个字段分别是[“a”, “b”, “c”] ,数据流中tuple的内容是:

     [1,

2, 3] [4, 1, 6] [3, 0, 8]

我们运行我们的function:  

     它意思是接收输入的每个tuple “b”字段得值,把函数结算结果做为新字段“d”追加到每个tuple后面,然后发射出去。

最终运行结果会是每个tuple有四个字段[“a”, “b”, “c”, “d”],每个tuple的内容变成了:

2, 3, 0] [1, 2, 3, 1] [4, 1, 6, 0]

    小结:我们注意到,如果一个function发射多个tuple时,每个发射的新tuple中仍会保留原来老tuple的数据。

2.1.2 filters:过滤操作

filters很简单,接收一个tuple并决定是否保留这个tuple。举个例子,定义一个filter:

假设我们的tuples有这个几个字段 [“a”,

“b”, “c”]: 

     [1, 2, 3] [2, 1, 1] [2, 3, 4]

然后运行我们的filter:

则最终得到的tuple是 :

     [2,

1, 1]

     说明第一个和第三个不满足条件,都被过滤掉了。

     小结:filter就是一个过滤器,它决定是否需要保留当前tuple。

2.1.3 partitionaggregate

    partitionaggregate的作用对每个partition中的tuple进行聚合,与前面的函数在原tuple后面追加数据不同,partitionaggregate的输出会直接替换掉输入的tuple,仅数据partitionaggregate中发射的tuple。下面举例说明:

定义一个累加的partitionaggregate:

假设我们的stream包含两个字段 [“a”,

“b”],各个partition的tuple内容是:

     ```

partition 0: [“a”, 1] [“b”, 2]

     partition 1: [“a”, 3] [“c”, 8]

     partition 2: [“e”, 1] [“d”, 9] [“d”, 10] ```

输出的内容只有一个字段“sum”,值是:

     ```

partition 0: [3]

     partition 1: [11]

     partition 2: [20] ```

    tridentapi提供了三个聚合器的接口:combineraggregator, reduceraggregator,

and aggregator.

我们先看一下combineraggregator接口:   

    combineraggregator接口只返回一个tuple,并且这个tuple也只包含一个field。init方法会先执行,它负责预处理每一个接收到的tuple,然后再执行combine函数来计算收到的tuples直到最后一个tuple到达,当所有tuple处理完时,combineraggregator会发射zero函数的输出,举个例子:

定义一个combineraggregator实现来计数:  

     小结:当你使用aggregate 方法代替partitionaggregate时,combineraggregator的好处就体现出来了,因为trident会自动优化计算,在网络传输tuples之前做局部聚合。

我们再看一下reduceraggregator:

     reduceraggregator通过init方法提供一个初始值,然后为每个输入的tuple迭代这个值,最后生产处一个唯一的tuple输出,下面举例说明:

定义一个reduceraggregator接口实现技术器的例子:

    aggregator接口可以发射含任意数量属性的任意数据量的tuples,并且可以在执行过程中的任何时候发射:

init:在处理数据之前被调用,它的返回值会作为一个状态值传递给aggregate和complete方法

aggregate:用来处理每一个输入的tuple,它可以更新状态值也可以发射tuple

complete:当所有tuple都被处理完成后被调用     

定义一个实现来完成一个计数器:

shuffle:通过随机分配算法来均衡tuple到各个分区

broadcast:每个tuple都被广播到所有的分区,这种方式在drcp时非常有用,比如在每个分区上做statequery

partitionby:根据指定的字段列表进行划分,具体做法是用指定字段列表的hash值对分区个数做取模运算,确保相同字段列表的数据被划分到同一个分区

global:所有的tuple都被发送到一个分区,这个分区用来处理整个stream

batchglobal:一个batch中的所有tuple都被发送到同一个分区,不同的batch会去往不同的分区

partition:通过一个自定义的分区函数来进行分区,这个自定义函数实现了 backtype.storm.grouping.customstreamgrouping

2.3 aggragation聚合操作

和paritionaggregate一样,aggregators的聚合也可以串联起来,但是如果你把一个 combineraggregator和一个非combineraggregator串联在一起,trident是无法完成局部聚合优化的。

另一种合并流的方式就是join。一个标准的join就像是一个sql,必须有标准的输入,因此,join只针对符合条件的stream。join应用在来自spout的每一个小batch中。join时候的tuple会包含: