天天看點

京東:Flink SQL 優化實戰

本文作者為京東算法服務部的張穎和段學浩,并由 Apache Hive PMC,阿裡巴巴技術專家李銳幫忙校對。主要内容為:
  1. 背景
  2. Flink SQL 的優化
  3. 總結

GitHub 位址

https://github.com/apache/flink

歡迎大家給 Flink 點贊送 star~

一、背景

京東:Flink SQL 優化實戰

目前,京東搜尋推薦的資料處理流程如上圖所示。可以看到實時和離線是分開的,離線資料處理大部分用的是 Hive / Spark,實時資料處理則大部分用 Flink / Storm。

這就造成了以下現象:在一個業務引擎裡,使用者需要維護兩套環境、兩套代碼,許多共性不能複用,資料的品質和一緻性很難得到保障。且因為流批底層資料模型不一緻,導緻需要做大量的拼湊邏輯;甚至為了資料一緻性,需要做大量的同比、環比、二次加工等資料對比,效率極差,并且非常容易出錯。

而支援批流一體的 Flink SQL 可以很大程度上解決這個痛點,是以我們決定引入 Flink 來解決這種問題。

在大多數作業,特别是 Flink 作業中,執行效率的優化一直是 Flink 任務優化的關鍵,在京東每天資料增量 PB 級情況下,作業的優化顯得尤為重要。

寫過一些 SQL 作業的同學肯定都知道,對于 Flink SQL 作業,在一些情況下會造成同一個 UDF 被反複調用的情況,這對一些消耗資源的任務非常不友好;此外,影響執行效率大緻可以從 shuffle、join、failover 政策等方面考慮;另外,Flink 任務調試的過程也非常複雜,對于一些線上機器隔離的公司來說尤甚。

為此,我們實作了内嵌式的 Derby 來作為 Hive 的中繼資料存儲資料庫 (allowEmbedded);在任務恢複方面,批式作業沒有 checkpoint 機制來實作failover,但是 Flink 特有的 region 政策可以使批式作業快速恢複;此外,本文還介紹了對象重用等相關優化措施。

二、 Flink SQL 的優化

1. UDF 重用

在 Flink SQL 任務裡會出現以下這種情況:如果相同的 UDF 既出現在 LogicalProject 中,又出現在 Where 條件中,那麼 UDF 會進行多次調用 (見

https://issues.apache.org/jira/browse/FLINK-20887)

。但是如果該 UDF 非常耗 CPU 或者記憶體,這種多餘的計算會非常影響性能,為此我們希望能把 UDF 的結果緩存起來下次直接使用。在設計的時候需要考慮:(非常重要:請一定保證 LogicalProject 和 where 條件的 subtask chain 到一起)

  • 一個 taskmanager 裡面可能會有多個 subtask,是以這個 cache 要麼是 thread (THREAD LOCAL) 級别要麼是 tm 級别;
  • 為了防止出現一些情況導緻清理 cache 的邏輯走不到,一定要在 close 方法裡将 cache 清掉;
  • 為了防止記憶體無限增大,選取的 cache 最好可以主動控制 size;至于 “逾時時間”,建議可以配置一下,但是最好不要小于 UDF 先後調用的時間;
  • 上文有提到過,一個 tm 裡面可能會有多個 subtask,相當于 tm 裡面是個多線程的環境。首先我們的 cache 需要是線程安全的,然後可根據業務判斷需不需要鎖。

根據以上考慮,我們用 guava cache 将 UDF 的結果緩存起來,之後調用的時候直接去cache 裡面拿資料,最大可能降低任務的消耗。下面是一個簡單的使用(同時設定了最大使用 size、逾時時間,但是沒有寫鎖):

public class RandomFunction extends ScalarFunction {
    private static Cache<String, Integer> cache = CacheBuilder.newBuilder()
            .maximumSize(2)
            .expireAfterWrite(3, TimeUnit.SECONDS)
            .build();

    public int eval(String pvid) {
        profileLog.error("RandomFunction invoked:" + atomicInteger.incrementAndGet());
        Integer result = cache.getIfPresent(pvid);
        if (null == result) {
            int tmp = (int)(Math.random() * 1000);
            cache.put("pvid", tmp);
            return tmp;
        }
        return result;
    }
    @Override
    public void close() throws Exception {
        super.close();
        cache.cleanUp();
    }
}           

2. 單元測試

大家可能會好奇為什麼會把單元測試也放到優化裡面,大家都知道 Flink 任務調試過程非常複雜,對于一些線上機器隔離的公司來說尤甚。京東的本地環境是沒有辦法通路任務伺服器的,是以在初始階段調試任務,我們耗費了很多時間用來上傳 jar 包、檢視日志等行為。

為了降低任務的調試時間、增加代碼開發人員的開發效率,實作了内嵌式的 Derby 來作為 Hive 的中繼資料存儲資料庫 (allowEmbedded),這算是一種優化開發時間的方法。具體思路如下:

首先建立 Hive Conf:

public static HiveConf createHiveConf() {
    ClassLoader classLoader = new HiveOperatorTest().getClass().getClassLoader();
    HiveConf.setHiveSiteLocation(classLoader.getResource(HIVE_SITE_XML));

    try {
        TEMPORARY_FOLDER.create();
        String warehouseDir = TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/metastore_db";
        String warehouseUri = String.format(HIVE_WAREHOUSE_URI_FORMAT, warehouseDir);

        HiveConf hiveConf = new HiveConf();
        hiveConf.setVar(
                HiveConf.ConfVars.METASTOREWAREHOUSE,
                TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath());
        hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, warehouseUri);

        hiveConf.set("datanucleus.connectionPoolingType", "None");
        hiveConf.set("hive.metastore.schema.verification", "false");
        hiveConf.set("datanucleus.schema.autoCreateTables", "true");
        return hiveConf;
    } catch (IOException e) {
        throw new CatalogException("Failed to create test HiveConf to HiveCatalog.", e);
    }
}           

接下來建立 Hive Catalog:(利用反射的方式調用 embedded 的接口)

public static void createCatalog() throws Exception{
    Class clazz = HiveCatalog.class;
    Constructor c1 = clazz.getDeclaredConstructor(new Class[]{String.class, String.class, HiveConf.class, String.class, boolean.class});
    c1.setAccessible(true);
    hiveCatalog = (HiveCatalog)c1.newInstance(new Object[]{"test-catalog", null, createHiveConf(), "2.3.4", true});
    hiveCatalog.open();
}           

建立 tableEnvironment:(同官網)

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
TableConfig tableConfig = tableEnv.getConfig();
Configuration configuration = new Configuration();
configuration.setInteger("table.exec.resource.default-parallelism", 1);
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tableEnv.useCatalog(hiveCatalog.getName());           

最後關閉 Hive Catalog:

public static void closeCatalog() {
    if (hiveCatalog != null) {
        hiveCatalog.close();
    }
}           

此外,對于單元測試,建構合适的資料集也是一個非常大的功能,我們實作了 CollectionTableFactory,允許自己建構合适的資料集,使用方法如下:

CollectionTableFactory.reset();
CollectionTableFactory.initData(Arrays.asList(Row.of("this is a test"), Row.of("zhangying480"), Row.of("just for test"), Row.of("a test case")));
StringBuilder sbFilesSource = new StringBuilder();
sbFilesSource.append("CREATE temporary TABLE db1.`search_realtime_table_dump_p13`(" + "  `pvid` string) with ('connector.type'='COLLECTION','is-bounded' = 'true')");
tableEnv.executeSql(sbFilesSource.toString());           

3. join 方式的選擇

傳統的離線 Batch SQL (面向有界資料集的 SQL) 有三種基礎的實作方式,分别是 Nested-loop Join、Sort-Merge Join 和 Hash Join。

效率 空間 備注
Nested-loop Join 占用大
Sort-Merge Join 有sort merge開銷 占用小 有序資料集的一種優化措施
Hash Join 适合大小表
  • Nested-loop Join 最為簡單直接,将兩個資料集加載到記憶體,并用内嵌周遊的方式來逐個比較兩個資料集内的元素是否符合 Join 條件。Nested-loop Join 的時間效率以及空間效率都是最低的,可以使用:table.exec.disabled-operators:NestedLoopJoin 來禁用。

    以下兩張圖檔是禁用前和禁用後的效果 (如果你的禁用沒有生效,先看一下是不是 Equi-Join):

    京東:Flink SQL 優化實戰
    京東:Flink SQL 優化實戰
  • Sort-Merge Join 分為 Sort 和 Merge 兩個階段:首先将兩個資料集進行分别排序,然後再對兩個有序資料集分别進行周遊和比對,類似于歸并排序的合并。(Sort-Merge Join 要求對兩個資料集進行排序,但是如果兩個輸入是有序的資料集,則可以作為一種優化方案)。
  • Hash Join 同樣分為兩個階段:首先将一個資料集轉換為 Hash Table,然後周遊另外一個資料集元素并與 Hash Table 内的元素進行比對。
    • 第一階段和第一個資料集分别稱為 build 階段和 build table;
    • 第二個階段和第二個資料集分别稱為 probe 階段和 probe table。
    Hash Join 效率較高但是對空間要求較大,通常是作為 Join 其中一個表為适合放入記憶體的小表的情況下的優化方案 (并不是不允許溢寫磁盤)。

注意:Sort-Merge Join 和 Hash Join 隻适用于 Equi-Join ( Join 條件均使用等于作為比較算子)。

Flink 在 join 之上又做了一些細分,具體包括:

特點 使用
Repartition-Repartition strategy 對資料集分别進行分區和shuffle,如果資料集大的時候效率極差 兩個資料集相差不大
Broadcast-Forward strategy 将小表的資料全部發送到大表資料的機器上 兩個資料集有較大的差距
  • Repartition-Repartition strategy:Join 的兩個資料集分别對它們的 key 使用相同的分區函數進行分區,并經過網絡發送資料;
  • Broadcast-Forward strategy:大的資料集不做處理,另一個比較小的資料集全部複制到叢集中一部分資料的機器上。

衆所周知,batch 的 shuffle 非常耗時間。

  • 如果兩個資料集有較大差距,建議采用 Broadcast-Forward strategy;
  • 如果兩個資料集差不多,建議采用 Repartition-Repartition strategy。

可以通過:table.optimizer.join.broadcast-threshold 來設定采用 broadcast 的 table 大小,如果設定為 “-1”,表示禁用 broadcast。

下圖為禁用前後的效果:

京東:Flink SQL 優化實戰
京東:Flink SQL 優化實戰

4. multiple input

在 Flink SQL 任務裡,降低 shuffle 可以有效的提高 SQL 任務的吞吐量,在實際的業務場景中經常遇到這樣的情況:上遊産出的資料已經滿足了資料分布要求 (如連續多個 join 算子,其中 key 是相同的),此時 Flink 的 forward shuffle 是備援的 shuffle,我們希望将這些算子 chain 到一起。Flink 1.12 引入了 mutiple input 的特性,可以消除大部分沒必要的 forward shuffle,把 source 的算子 chain 到一起。

table.optimizer.multiple-input-enabled:true

下圖為開了 multiple input 和沒有開的拓撲圖 ( operator chain 功能已經打開):

京東:Flink SQL 優化實戰
京東:Flink SQL 優化實戰

5. 對象重用

上下遊 operator 之間會經過序列化 / 反序列化 / 複制階段來進行資料傳輸,這種行為非常影響 Flink SQL 程式的性能,可以通過啟用對象重用來提高性能。但是這在 DataStream 裡面非常危險,因為可能會發生以下情況:在下一個算子中修改對象意外影響了上面算子的對象。

但是 Flink 的 Table / SQL API 中是非常安全的,可以通過如下方式來啟用:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();           

或者是通過設定:pipeline-object-reuse:true

為什麼啟用了對象重用會有這麼大的性能提升?在 Blink planner 中,同一任務的兩個算子之間的資料交換最終将調用 BinaryString#copy,檢視實作代碼,可以發現 BinaryString#copy 需要複制底層 MemorySegment 的位元組,通過啟用對象重用來避免複制,可以有效提升效率。

下圖為沒有開啟對象重用時相應的火焰圖:

京東:Flink SQL 優化實戰

6. SQL 任務的 failover 政策

batch 任務模式下 checkpoint 以及其相關的特性全部都不可用,是以針對實時任務的基于 checkpoint 的 failover 政策是不能應用在批任務上面的,但是 batch 任務允許 Task 之間通過 Blocking Shuffle 進行通信,當一個 Task 因為任務未知的原因失敗之後,由于 Blocking Shuffle 中存儲了這個 Task 所需要的全部資料,是以隻需要重新開機這個 Task 以及通過 Pipeline Shuffle 與其相連的全部下遊任務即可:

jobmanager.execution.failover-strategy:region (已經 finish 的 operator 可直接恢複)

table.exec.shuffle-mode:ALL_EDGES_BLOCKING (shuffle 政策)。

7. shuffle

Flink 裡的 shuffle 分為 pipeline shuffle 和 blocking shuffle。

  • pipeline shuffle 性能好,但是對資源的要求高,而且容錯比較差 (會将該 operator 分到前面的一個 region 裡面,對于 batch 任務來說,如果這個算子出問題,将從上一個 region 恢複);
  • blocking shuffle 就是傳統的 batch shuffle,會将資料落盤,這種 shuffle 的容錯好,但是會産生大量的磁盤、網絡 io (如果為了省心的話,建議用 blocking suffle)。blocking shuffle 又分為 hash shuffle 和 sort shuffle,
    • 如果你的磁盤是 ssd 并且并發不太大的話,可以選擇使用 hash shuffle,這種 shuffle 方式産生的檔案多、随機讀多,對磁盤 io 影響較大;
    • 如果你是 sata 并且并發比較大,可以選擇用 sort-merge shuffle,這種 shuffle 産生的資料少,順序讀,不會産生大量的磁盤 io,不過開銷會更大一些 (sort merge)。

相應的控制參數:

table.exec.shuffle-mode,該參數有多個參數,預設是 ALL_EDGES_BLOCKING,表示所有的邊都會用 blocking shuffle,不過大家可以試一下 POINTWISE_EDGES_PIPELINED,表示 forward 和 rescale edges 會自動開始 pipeline 模式。

taskmanager.network.sort-shuffle.min-parallelism ,将這個參數設定為小于你的并行度,就可以開啟 sort-merge shuffle;這個參數的設定需要考慮一些其他的情況,具體的可以按照官網設定。

三、總結

本文着重從 shuffle、join 方式的選擇、對象重用、UDF 重用等方面介紹了京東在 Flink SQL 任務方面做的優化措施。另外,感謝京東實時計算研發部付海濤等全部同僚的支援與幫助。

更多 Flink 相關技術問題,可掃碼加入社群釘釘交流群

第一時間擷取最新技術文章和社群動态,請關注公衆号~

京東:Flink SQL 優化實戰

活動推薦

阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟活動:

99 元試用

實時計算Flink版

(包年包月、10CU)即有機會獲得 Flink 獨家定制T恤;另包 3 個月及以上還有 85 折優惠!

了解活動詳情:

https://www.aliyun.com/product/bigdata/sc
京東:Flink SQL 優化實戰