正如微服务架构的发展,使得开发人员们最终将服务网格抽取出来,单独形成一个边车模式(一文读懂Service Mesh:边车模式简介),从而使得大部分开发者可以专注于业务逻辑的开发,而不用去过多了解底层的微服务相关功能实现一样,随着大数据技术的发展,越来越多的分布式数据处理框架,从最早的Hadoop MapReduce,到Storm、Spark、以及后来出现的Flink、Apex等,能够解决的问题越来越多,性能也在不断进行优化。但是由此给研发人员带来的学习成本也是成倍增加。同时对于旧的系统适配,代价也非常大。
因此,Apache Beam项目应运而生,它是Google在2016年贡献给Apaceh基金会的开源项目。它并不是一套新的框架,不涉及具体的执行引擎的实现。相反,它是一套用于统一静态数据批处理和流式数据处理的编程范式。旨在为不同形态的数据集提供简单灵活、功能丰富、易于学习理解的SDK。
下面我们就来看看Apaceh Beam的一些特点。
一、基本架构
Apache Beam由Beam SDK和Beam Runner组成。前者定义了开发分布式数据处理任务业务逻辑的API接口,由其生成的分布式数据处理任务交给具体的Beam Runner执行引擎。而其支持的执行引擎目前已经包括了Apache Flink、Apache Sprak以及Google DataFlow Cloud。而Hadoop、Storm、Gearpump等执行引擎也在发展之中。Apache Beam的基本架构如下图所示:
目前Beam SDK支持的API接口主要还是由Java语言实现,Python、Go等其它版本也在快速开发之中。目前Google DataFlow Cloud是执行Beam SDK标准最为全面的执行引擎。而在开源执行引擎中,Flink则是包括最多功能的。但是需要指出的是,Beam SDK是一个大而全的SDK,在实现中并不是所有的执行引擎都有可能完全支持其中所有的功能。就如同刀可以切东西一样,让纤细锋利的手术刀去支持砍大骨头的操作完全是没必要。因此作为开发人员应该熟悉各类执行引擎适用的情境,从而选择正确的执行引擎。
二、一些基本概念
1、PCollection:数据集,数据处理就是在对各种PCollection进行转换和处理。
2、PTransform:数据变换器,用来定义数据是怎么被处理的,用来处理PCollection。
3、Pipeline:流水线,是由PTransform和PCollection组成的集合,可以理解为它定义了数据处理从源到目标的整个过程。
4、Runner:执行引擎。
三、Beam SDK的调用流程
通常来讲,Beam SDK的使用流程都是差不多的,主要是三个部分:
1、首先创建pipeline
Pipeline p = Pipeline.create(options);
2、然后应用实施各种业务逻辑需要的变换
p.apply(PTransform t);
这里面的PTransform<InputT, OutputT>是输入一个 InputT并输出一个OutputT的操作。
常用的 PTransforms包括:
u基本操作,如TextIO.Read,Create
u处理和变换操作,如ParDo, GroupByKey, CoGroupByKey, Combine 和 Count
u输出类型的,如TextIO.Write
u用户也可以根据自己的业务需要,自定义应用相关的复合PTransform,复合变换只需实现 expand(InputT) 方法,由输入计算输出
简单来说,就是一个数据集经过一系列的变换,变成另一个数据集所需要的方法和步骤。这其中,多个PTransform可以处理一个数据集,一个PTransform也可以生成多个数据体。
3、最后由Runner实现run()方法执行。
p.run().waitUntilFinish();
一个简单的计数单词数的示例如下:
public class WordCountDirect {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
PCollection<String> lines = pipeline.apply("read from file",
TextIO.read().from("pkslow.txt"));
PCollection<List<String>> wordList =
lines.apply(MapElements.via(new SimpleFunction<String, List<String>>() {
@Override
public List<String> apply(String input) {
List<String> result = new ArrayList<>();
char[] chars = input.toCharArray();
for (char c:chars) {
result.add(String.valueOf(c));
}
return result;
}
}));
PCollection<String> words = wordList.apply(Flatten.iterables());
PCollection<KV<String, Long>> wordCount =
words.apply(Count.perElement());
wordCount.apply(MapElements.via(new SimpleFunction<KV<String, Long>,
String>() {
@Override
public String apply(KV<String, Long> count) {
return String.format("%s : %s", count.getKey(), count.getValue());
}
})).apply(TextIO.write().to("word-count-result"));
pipeline.run().waitUntilFinish();
}
}
整个过程就是对于数据集进行各种变换处理,最终得到所需要的数据集的过程。开发起来非常简单。
喜欢本文的话,欢迎关注活在信息时代哦:)