天天看點

apache beam入門之旁路輸入

目錄:apache beam 個人使用經驗總結目錄和入門指導(Java)

如果我們希望給某個PCollection資料集輸入1個 計算後 的結果,則就需要用到旁路輸入。

例如要計算某個資料集的方差,其公式為

apache beam入門之旁路輸入

這個計算過程就需要用到旁路輸入,來将平均值u傳遞給資料集中的每個數字進行計算。

我們下面這個資料集為例

PCollection<Integer> numbers = pipeline.apply(Create.of(10,20,40,70,80));      

建立旁路輸入視窗

資料集的平均值就是旁路輸入,注意這個旁路輸入的值必須是單個值,即資料集聚合後的1個值。建立方式如下:

// 使用bema自帶的平均值sdk計算平局值
Combine.Globally<Double, Double> combineGlobally = Mean.<Double>globally();
// 加上asSingletonView()轉為旁路輸入視圖
PCollectionView<Double> meanView = numbers.apply(combineGlobally.asSingletonView());      

向PTransform對象組裝旁路輸入

接着建立均值差平方的轉化類

// 建立均值內插補點 平方的轉化類
ParDo.SingleOutput<Double, Double> meanSquareTrans = ParDo.<Double, Double>of(new DoFn<Double, Double>() {
    @ProcessElement
    public void precessElement(ProcessContext context) {
        // 根據旁路輸入的引用,取出旁路輸入值
        Double mean = context.sideInput(meanView);
        Double number = context.element();

        Double answer = Math.abs(number - mean) * Math.abs(number - mean);
        context.output(answer);
    }
});      

然後在apply組裝的時候,用withSideInput将旁路輸入組裝進去,便可進行計算

// 組裝轉化類
// 注意要把meanView作為旁路輸入組裝進去
// 得到 |數值-平均值|^2 的資料集
PCollection<Double> meanSquare = numbers.apply(meanSquareTrans.withSideInputs(meanView));      

注意事項: 如果是基于分布式計算引擎例如spark、flink, 需要确認運作在各executor時, 能否加載到meanView這個PCollectionView引用,如果加載不到,則需要作為私有成員放進DoFn子類中去擷取

combine聚合操作組裝旁路輸入

計算方差需要把meanSquare資料集裡的元素一一相加再除去元素總數,是以也要用到旁路輸入。

// 計算資料集内的元素數量
PCollectionView<Long> numberCount = numbers.apply(Combine.globally(Count.<Double>combineFn()).asSingletonView());

// 如果是自定義Combine類,numberCount需要作為成員傳入,否則分布式計算時無法擷取
PCollection<Double> variance = meanSquare.apply(
  Combine.globally(
    new CaculateVarianceCombineFn(numberCount)
  ).withSideInputs(numberCount)
);      

因為之前介紹的combine方法無法擷取context,因為需要改用另一種combineFn,可以拿到上下文文本context,并擷取旁路輸入

static class CaculateVarianceCombineFn extends CombineWithContext.CombineFnWithContext<Double, Double, Double> {
    private PCollectionView<Long> numberCountView;

    public CaculateVarianceCombineFn(PCollectionView<Long> numberCountView) {
        this.numberCountView = numberCountView;
    }

    // 主要要實作defaultValue這個接口
    @Override
    public Double defaultValue() {
        return Double.valueOf(0);
    }

    @Override
    public Double createAccumulator(CombineWithContext.Context c) {
        return Double.valueOf(0);
    }

    @Override
    public Double addInput(Double accumulator, Double input, CombineWithContext.Context c) {
        return accumulator + input;
    }

    @Override
    public Double mergeAccumulators(Iterable<Double> accumulators, CombineWithContext.Context c) {
        Double mergeResult = createAccumulator(c);
        for(Double accmulator : accumulators) {
            mergeResult += accmulator;
        }
        return mergeResult;
    }

    // 計算最終聚合結果
    @Override
    public Double extractOutput(Double accumulator, CombineWithContext.Context c) {
        Long numberCount = c.sideInput(numberCountView);
        System.out.println("end result=" + accumulator / (numberCount - 1));
        return accumulator / (numberCount - 1);
    }
}      

最終計算拓撲圖

計算拓撲圖如下:

numbers

mean

meanSquare