目錄:apache beam 個人使用經驗總結目錄和入門指導(Java)
如果我們希望給某個PCollection資料集輸入1個 計算後 的結果,則就需要用到旁路輸入。
例如要計算某個資料集的方差,其公式為
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0gTMx81dsQWZ4lmZf1GLlpXazVmcvwFciV2dsQXYtJ3bm9CX9s2RkBnVHFmb1clWvB3MaVnRtp1XlBXe0xCMy81dvRWYoNHLwEzX5xCMx8FesU2cfdGLwMzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5SN0kjMzATYjdTM5IjNwEzMzYzX1QzM1kDM2IzLclDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.png)
這個計算過程就需要用到旁路輸入,來将平均值u傳遞給資料集中的每個數字進行計算。
我們下面這個資料集為例
PCollection<Integer> numbers = pipeline.apply(Create.of(10,20,40,70,80));
建立旁路輸入視窗
資料集的平均值就是旁路輸入,注意這個旁路輸入的值必須是單個值,即資料集聚合後的1個值。建立方式如下:
// 使用bema自帶的平均值sdk計算平局值
Combine.Globally<Double, Double> combineGlobally = Mean.<Double>globally();
// 加上asSingletonView()轉為旁路輸入視圖
PCollectionView<Double> meanView = numbers.apply(combineGlobally.asSingletonView());
向PTransform對象組裝旁路輸入
接着建立均值差平方的轉化類
// 建立均值內插補點 平方的轉化類
ParDo.SingleOutput<Double, Double> meanSquareTrans = ParDo.<Double, Double>of(new DoFn<Double, Double>() {
@ProcessElement
public void precessElement(ProcessContext context) {
// 根據旁路輸入的引用,取出旁路輸入值
Double mean = context.sideInput(meanView);
Double number = context.element();
Double answer = Math.abs(number - mean) * Math.abs(number - mean);
context.output(answer);
}
});
然後在apply組裝的時候,用withSideInput将旁路輸入組裝進去,便可進行計算
// 組裝轉化類
// 注意要把meanView作為旁路輸入組裝進去
// 得到 |數值-平均值|^2 的資料集
PCollection<Double> meanSquare = numbers.apply(meanSquareTrans.withSideInputs(meanView));
注意事項: 如果是基于分布式計算引擎例如spark、flink, 需要确認運作在各executor時, 能否加載到meanView這個PCollectionView引用,如果加載不到,則需要作為私有成員放進DoFn子類中去擷取
combine聚合操作組裝旁路輸入
計算方差需要把meanSquare資料集裡的元素一一相加再除去元素總數,是以也要用到旁路輸入。
// 計算資料集内的元素數量
PCollectionView<Long> numberCount = numbers.apply(Combine.globally(Count.<Double>combineFn()).asSingletonView());
// 如果是自定義Combine類,numberCount需要作為成員傳入,否則分布式計算時無法擷取
PCollection<Double> variance = meanSquare.apply(
Combine.globally(
new CaculateVarianceCombineFn(numberCount)
).withSideInputs(numberCount)
);
因為之前介紹的combine方法無法擷取context,因為需要改用另一種combineFn,可以拿到上下文文本context,并擷取旁路輸入
static class CaculateVarianceCombineFn extends CombineWithContext.CombineFnWithContext<Double, Double, Double> {
private PCollectionView<Long> numberCountView;
public CaculateVarianceCombineFn(PCollectionView<Long> numberCountView) {
this.numberCountView = numberCountView;
}
// 主要要實作defaultValue這個接口
@Override
public Double defaultValue() {
return Double.valueOf(0);
}
@Override
public Double createAccumulator(CombineWithContext.Context c) {
return Double.valueOf(0);
}
@Override
public Double addInput(Double accumulator, Double input, CombineWithContext.Context c) {
return accumulator + input;
}
@Override
public Double mergeAccumulators(Iterable<Double> accumulators, CombineWithContext.Context c) {
Double mergeResult = createAccumulator(c);
for(Double accmulator : accumulators) {
mergeResult += accmulator;
}
return mergeResult;
}
// 計算最終聚合結果
@Override
public Double extractOutput(Double accumulator, CombineWithContext.Context c) {
Long numberCount = c.sideInput(numberCountView);
System.out.println("end result=" + accumulator / (numberCount - 1));
return accumulator / (numberCount - 1);
}
}
最終計算拓撲圖
計算拓撲圖如下:
numbers
mean
meanSquare