《从零开始学Storm》
apachecn/storm-doc-zh
Trident是在Storm的基础上,一个以实时计算为目标的 high-level abstraction (高度抽象)。 它在提供处理大吞吐量数据能力(每秒百万次消息)的同时, 也提供了低延时分布式查询和 stateful stream processing (有状态流式处理)的能力。
Trident 提供了
joins (连接), aggregations(聚合), grouping(分组), functions, 以及 filters 等
能力… 除此之外, Trident 还提供了一些专门的 primitives (原语), 从而在基于数据库或者其他存储的前提下来应付有状态的递增式处理. Trident 也提供
一致性(consistent)、有且仅有一次(exactly-once)
等语义。
Trident API
“Stream” 是 Trident 中的核心数据模型, 它被当做一系列的 batch 来处理.在 Storm 集群的节点之间, 一个 stream 被划分成很多 partition (分区), 对Stream(流)的
operation (操作)
是在每个 partition 上并行进行的.
- 也有说法:TridentTuple 是 Trident 中的核心数据模型。
- 一个 stream 被划分成很多 partition : partition 是 stream 的一个子集, 里面可能有多个 batch ,
上.
一个 batch 也可能位于不同的 partition
Trident 有 5 类操作:
- Partition-local operations(本地分区操作): 对每个 partition 的局部操作,
不产生网络传输
- Repartitioning operations(重分区操作): 对 stream (数据流)的重新划分(仅仅是划分, 但不改变内容),
产生网络传输
- Aggregation operations (聚合操作):部分进行网络传输的 .
- Operations on grouped streams (流分组操作):
- Merges & joins(合并和连接操作):
类似SQL中的join
本地分区操作
Partition-local operations (分区本地操作)不涉及网络传输, 并且独立地应用于每个 batch partition (batch分区).
Function(函数)
一个 function 收到一个输入 tuple 后可以
输出 0 或多个 tuple
, 输出 tuple 的字段被
追加
到接收到的输入 tuple 后面.如果对某个 tuple 执行 function 后没有输出 tuple, 则该 tuple 被 filter(过滤), 否则, 就会为每个输出 tuple 复制一份输入 tuple 的副本.
例:假设有如下的 function,
public class MyFunction extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
//如果tuple.getInteger(0) > 1 ,会将一个tuple变为多条
for(int i=0; i < tuple.getInteger(0); i++) {
collector.emit(new Values(i));
}
}
}
执行如下代码,
通过输入元组为Fields("b")计算,追加的Fields("d")
:
public void a_function2(){
/**
* before after
* [1, 2, 3] [1, 2, 3, 0]
* ==> [1, 2, 3, 1]
* [4, 1, 6] [4, 1, 6, 0]
* [3, 0, 8] 过滤掉.......
*/
dummyStream.each(new Fields("b"), new MyFunction(), new Fields("d"));
}
其中,
[1, 2, 3] 输出为2条tuple,[3, 0, 8] 则被过滤掉。
Filter(过滤器)
Filters 收到一个输入 tuple , 并决定是否保留该 tuple。
例:假设有如下的 filter:
仅保留 number0 + number1 为偶数的tuple.
,
class CheckEvenSumFilter extends BaseFilter{
@Override
public boolean isKeep(TridentTuple tuple) {
int number0 = tuple.getInteger(0);
int number1 = tuple.getInteger(1);
int sum = number0 + number1;
return sum % 2 == 0;
}
}
执行如下代码:
public void b_filter(){
/**
* before after
* [1,1,3,11] ====> [1,1,3,11]
* [2,2,7,1] [2,2,7,1]
* [2,3,4,5] 过滤剔除....
*/
dummyStream.each(new Fields("a", "b"), new CheckEvenSumFilter());
}
其中tuple:
[2,3,4,5] 被过滤剔除.
map and flatMap
map
map 返回一个 stream , 它包含将给定的 mapping function (映射函数)应用到 stream 的 tuples 的结果. 这个可以用来对 tuples 应用
one-one transformation (一一变换
).
例:如果有一个sentences流,你想将它转换为大写语句,可以使用如下 map函数
class UpperCase implements MapFunction {
@Override
public Values execute(TridentTuple input) {
return new Values(input.getString(0).toUpperCase());
}
}
执行如下代码:
public void c_map(){
dummyStream.map(new UpperCase());
//用新的输出字段名称替换旧的字段 === 相当于 fields-rename, fields个数不变
dummyStream.map(new UpperCase(), new Fields("uppercased"));
}
注意,
第2个字段Fields outputFields可选。
flatMap
flatMap 类似于 map , 但具有将
one-to-many transformation (一对多变换)
应用于 values of the stream (流的值)的效果, 然后将所得到的元素 flattening (平坦化)为新的 stream .
例: 如果有sentences 流, 并且您想将其转换成 words流,可以使用如下 flatmap函数,
class Split implements FlatMapFunction {
@Override
public Iterable<Values> execute(TridentTuple input) {
List<Values> valuesList = new ArrayList<>();
//根据“ ”,将sentences 拆分为 words
for (String word : input.getString(0).split(" ")) {
valuesList.add(new Values(word));
}
return valuesList;
}
}
执行如下代码:
public void d_flatmap(){
dummyStream.flatMap(new Split());
dummyStream.flatMap(new Split(), new Fields("word"));
}
peek
peek 可用于在每个 trident tuple 流过 stream 时对其执行 additional action (附加操作). 这可能对于在流经 pipeline 中某一点的元组来 debugging (调试) tuples 是有用的.
dummyStream.a.b.c.peek(new Consumer() {
@Override
public void accept(TridentTuple input) {
//打印TridentTuple 信息
System.out.println(input.getString(0));
}
});
min and minBy
min 和 minBy operations (操作)在 trident stream 中的 a batch of tuples (一批元组)的
每个 partition (分区)
上返回 minimum value (最小值).
它的api如下:
//1 min方法:需要显示的声明Comparator
public Stream min(Comparator<TridentTuple> comparator) {...}
//2.1 minBy:通过指定具体字段来比较
public Stream minBy(String inputFieldName) {...}
//2.2 minBy:通过指定具体字段 & 显示声明Comparator
public <T> Stream minBy(String inputFieldName, Comparator<T> comparator) {...}
max and maxBy
max 和 maxBy operations (操作)在 trident stream 中的一 batch of tuples (批元组)的
每个 partition (分区)上返回 maximum (最大值)
.,它的api方法:
public Stream max(Comparator<TridentTuple> comparator) {...}
public Stream maxBy(String inputFieldName) {...}
public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator) {...}
partitionAggregate分区聚合
partitionAggregate是运行在每个batch元组partition(分区)上的Function(函数),又不同于上面的 functions 操作, partitionAggregate 的输出 tuple 将会
取代
收到的输入 tuple 。
例:假设 input stream 包括字段 [“a”, “b”] , 并有下面的 partitions of tuples (元组 partitions ):
Partition 0:
["a", 1]
["b", 2]
Partition 1:
["a", 3]
["c", 8]
Partition 2:
["e", 1]
["d", 9]
["d", 10]
执行如下代码,对输入元组b字段进行求和:
mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
,
则输出流
只包含sum字段
,即求和结果:
Partition 0:
[3]
Partition 1:
[11]
Partition 2:
[20]
上面代码中的 new Sum() 实际上是一个
aggregator (聚合器)
, 定义一个聚合器有三种不同的接口:
- CombinerAggregator
- ReducerAggregator
- Aggregator .
CombinerAggregator
CombinerAggregator
接口定义如下:
public interface CombinerAggregator<T> extends Serializable {
T init(TridentTuple tuple);
T combine(T val1, T val2);
T zero();
}
一个 CombinerAggregator 仅
输出一个 tuple(该 tuple 也只有一个字段)
,
- 每收到一个输入 tuple, CombinerAggregator 就会执行 init() 方法(该方法返回一个初始值)
- 调用 combine() 方法汇总这些值, 直到剩下一个值为止(聚合值)
- .如果 partition 中没有 tuple, CombinerAggregator 会发送 zero() 的返回值.
// 聚合器 Count vs Sum
public class Count implements CombinerAggregator<Long> {
@Override
public Long init(TridentTuple tuple) {
//count初始值为1
return 1L;
}
@Override
public Long combine(Long val1, Long val2) {
return val1 + val2;
}
@Override
public Long zero() {
return 0L;
}
}
public class Sum implements CombinerAggregator<Number> {
@Override
public Number init(TridentTuple tuple) {
//sum初始值为0
return (Number) tuple.getValue(0);
}
@Override
public Number combine(Number val1, Number val2) {
return Numbers.add(val1, val2);
}
@Override
public Number zero() {
return 0;
}
}
当使用方法代替
aggregate()
方法时, 就能看到 CombinerAggregation 带来的好处.这种情况下, Trident 会自动优化计算:
partitionAggregate()
.
先做局部聚合操作, 然后再通过网络传输 tuple 进行全局聚合
ReducerAggregator
ReducerAggregator
接口如下:
public interface ReducerAggregator<T> extends Serializable {
T init();
T reduce(T curr, TridentTuple tuple);
}
ReducerAggregator 使用 init() 方法产生一个初始值, 对于每个输入 tuple , 依次迭代这个初始值, 最终产生一个单值输出 tuple。
ReducerAggregator 也可以与 persistentAggregate
一起使用, 后续会有涉及.
Aggregator
聚合
最通用的接口
是
Aggregator
,Aggregator的接口定义如下:
public interface Aggregator<T> extends Operation {
T init(Object batchId, TridentCollector collector);
void aggregate(T state, TridentTuple tuple, TridentCollector collector);
void complete(T state, TridentCollector collector);
}
Aggregator 可以输
出任意数量的 tuple , 且这些 tuple 的字段也可以有多个
。执行过程中的
任何时候都可以输出 tuple (三个方法的参数中都有 collector )
. . Aggregator 的执行方式如下:
-
: 处理每个 batch 之前调用一次 init() 方法。init的返回值是一个表示init方法
的对象,该对象会传递给后续的aggregate和complete方法。聚合状态
!!!在实际的实践过程中发现, init()在batch partition第一次处理时被调用。
-
:每个收到一个该 batch 中的输入aggregate方法
就会调用一次 aggregate, 该方法中可以tuple
,并更新状态
发出元组.可选地
-
: 当batch partition的所有tuple都已经被complete方法
处理后被调用。aggregate方法
例:下面的代码将 Count 作为 Aggregator 实现:
public class CountAgg extends BaseAggregator<CountState> {
static class CountState {
long count = 0;
}
public CountState init(Object batchId, TridentCollector collector) {
return new CountState();
}
public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
state.count+=1;
}
public void complete(CountState state, TridentCollector collector) {
collector.emit(new Values(state.count));
}
}
aggregators 链式用法
有时需要同时执行
multiple aggregators (多个聚合)操作
, 这个可以使用 chaining (链式)操作完成:
mystream.chainedAgg()
.partitionAggregate(new Count(), new Fields("count"))
.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
.chainEnd()
这段代码将会对每个 partition 执行 Count 和 Sum aggregators (聚合器), 并输出一个tuple 字段 [“count”, “sum”].
stateQuery(状态查询) and partitionPersist(状态持久化)
stateQuery 和 partitionPersist 分别 query (查询)和 update (更新) sources of state (状态源)
projection(投影)
经 Stream 中的 project 方法处理后的 tuple 仅保持指定字段(相当于过滤字段)
public void z_projections(){
//假设当前dummyStream有元组字段 [x,y ,z], 指定x后, 只保留[x]
/**
* before after
* [1,4,7] ====> [1]
* [4,1,3] [4]
*/
dummyStream.project(new Fields("x"));
}
重分区操作
Repartitioning operations (重新分区操作)
运行一个函数来更改元组在任务之间的分区。
分区的数量可以通过重分区操作而改变,重分区操作
需要网络传输
,以下是充分去的方法:
-
: 随机将 tuple 均匀地分发到目标 partition 里.shuffle
-
broadcast
: 每个 tuple 被复制到所有的目标 partition 里,
这在 DRPC 中非常有用,例如需要对每个分区的数据做一个stateQuery操作。
-
:对每个 tuple 选择 partition 的方法是:(该 tuple 指定字段的 hash 值) mod (目标 partition 的个数), 该方法partitionBy
的 tuple 能够被发送到同一个 partition .确保指定字段相同
.但同一个 partition 里可能有字段不同的 tuple(因为mod后的值相同)
-
: 所有的 tuple 都被发送到同一个 partitionglobal
-
: 确保同一个 batch 中的 tuple 被发送到相同的 partition 中.batchGlobal
-
: 此方法采用实现partition
的自定义分区函数.org.apache.storm.grouping.CustomStreamGrouping
聚合操作
Trident 中有 aggregate() 和 persistentAggregate() 方法对流进行聚合操作。
-
:在每个 batch 上独立的执行,aggregate()
-
: 对所有 batch 中的所有 tuple 进行聚合, 并将结果存入 state 源中.persistemAggregate()
aggregate() 对 Stream 做全局聚合时的两种方式:
-
:流先被重新划分成一个大分区(仅有一个 partition ), 然后对这个 partition 做聚合操作。当使用 ReduceAggregator 或者 Aggregator 聚合器时
-
:Trident 首先对每个 partition 局部聚合, 然后将所有这些 partition 重新划分到一个 partition 中, 完成全局聚合.当使用 CombinerAggregator 时
.CombinerAggregator 更高效, 推荐使用
例:使用 aggregate() 对一个 batch 操作得到一个全局的 count
mystream.aggregate(new Count(), new Fields("count"));
流分组操作
groupBy 操作:
- 首先对流中的指定字段做 partitionBy 操作, 让指定字段相同的 tuple 能被发送到同一个 partition 里.
- 然后在每个 partition 里根据指定字段值对该分区里的 tuple 进行分组.
下面演示了 groupBy 操作的过程:

- 如果你在一个 grouped stream 上做聚合操作, 聚合操作将会在
.
每个 group (分组)内进行, 而不是整个 batch 上
- GroupStream 类中也有 persistentAggregate 方法, 该方法聚合的结果将会存储在一个
的 MapState 中.
key 值为分组字段(即 groupBy 中指定的字段)
- 普通的 stream 一样, groupstream 上的聚合操作也可以使用 chained (链式语法)
合并与连接
merge
将几个 stream 汇总到一起, 最简单的汇总方法是将他们合并成一个 stream , 这个可以通过 TridentTopology 中的 merge 方法完成, 就像这样:
topology.merge(stream1, stream2, stream3);
Trident会重命名新的输出字段,合并流并
以第一个流的 output fields (输出字段)来命名
。
join
合并流的另一种方法是
join(连接,类似sql中的join)
,它 需要
有限的输入
,针对无限流是没有意义的。
join仅适用于从 spout 发出的每个 small batch 中.
例:以下是包含字段 [“key”, “val1”, “val2”] 的 stream 和包含 [“x”, “val1”] 的另一个 stream 之间的 join 示例:
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
stream1中key字段和stream2中x字段进行连接。
另外Trident要求所有新流的输出字段被重命名,因为各个输入流之间可能会存在重复的字段名称。从 join 发出的 tuples 将包含:
- list of join fields (连接字段列表).在这种情况下,
"key" 对应于 stream1 的 "key" , stream2 对应于 "x" .
- 接下来, 按照 streams 如何传递到 join 方法的顺序, 所有流中的
.在这种情况下, “a” 和 “b” 对应于来自 stream1 的 “val1” 和 “val2” , “c” 对应于来自 stream2 的 “val1” .所有非连接字段的列表