天天看點

Apache-Flink深度解析-JOIN 算子

聊什麼

在《SQL概覽》中我們介紹了JOIN算子的語義和基本的使用方式,介紹過程中大家發現Apache Flink在文法語義上是遵循ANSI-SQL标準的,那麼再深思一下傳統資料庫為啥需要有JOIN算子呢?在實作原理上面Apache Flink内部實作和傳統資料庫有什麼差別呢?本篇将詳盡的為大家介紹傳統資料庫為什麼需要JOIN算子,以及JOIN算子在Apache Flink中的底層實作原理和在實際使用中的優化!

什麼是JOIN

在《Apache Flink 漫談系列 - SQL概覽》中我對JOIN算子有過簡單的介紹,這裡我們以具體執行個體的方式讓大家對JOIN算子加深印象。JOIN的本質是分别從N(N>=1)張表中擷取不同的字段,進而得到最完整的記錄行。比如我們有一個查詢需求:在學生表(學号,姓名,性别),課程表(課程号,課程名,學分)和成績表(學号,課程号,分數)中查詢所有學生的姓名,課程名和考試分數。如下

為啥需要JOIN

JOIN的本質是資料拼接,那麼如果我們将所有資料列存儲在一張大表中,是不是就不需要JOIN了呢?如果真的能将所需的資料都在一張表存儲,我想就真的不需要JOIN的算子了,但現實業務中真的能做到将所需資料放到同一張大表裡面嗎?答案是否定的,核心原因有2個:

  • 産生資料的源頭可能不是一個系統;
  • 産生資料的源頭是同一個系統,但是資料備援的沉重代價,迫使我們會遵循資料庫範式,進行表的設計。簡說NF如下: 
    • 1NF - 列不可再分;
    • 2NF - 符合1NF,并且非主鍵屬性全部依賴于主鍵屬性;
    • 3NF - 符合2NF,并且消除傳遞依賴,即:任何字段不能由其他字段派生出來;
    • BCNF - 符合3NF,并且主鍵屬性之間無依賴關系。

當然還有 4NF,5NF,不過在實際的資料庫設計過程中做到BCNF已經足夠了!(并非否定4NF,5NF存在的意義,隻是個人還沒有遇到一定要用4NF,5NF的場景,設計往往會按存儲成本,查詢性能等綜合因素考量)

JOIN種類

JOIN 在傳統資料庫中有如下分類:

  • CROSS JOIN - 交叉連接配接,計算笛卡兒積;
  • INNER JOIN - 内連接配接,傳回滿足條件的記錄;
  • OUTER JOIN
    • LEFT - 傳回左表所有行,右表不存在補NULL;
    • RIGHT - 傳回右表所有行,左邊不存在補NULL;
    • FULL -  傳回左表和右表的并集,不存在一邊補NULL;
  • SELF JOIN - 自連接配接,将表查詢時候命名不同的别名。

JOIN文法

JOIN 在SQL89和SQL92中有不同的文法,以INNER JOIN為例說明:

  • SQL89 - 表之間用“,”逗号分割,連結條件和過濾條件都在Where子句指定:

SELECT   a.colA,   b.colA FROM    tab1 AS a , tab2 AS b WHERE a.id = b.id and a.other > b.other

  • SQL92 - SQL92将連結條件在ON子句指定,過濾條件在WHERE子句指定,邏輯更為清晰:

SELECT   a.colA,   b.colA FROM   tab1 AS a JOIN tab2 AS b ON a.id = b.id WHERE   a.other > b.other

本篇中的後續示例将應用SQL92文法進行SQL的編寫,文法如下:

tableExpression [ LEFT|RIGHT|FULL|INNER|SELF ] JOIN tableExpression [ ON joinCondition ] [WHERE filterCondition]

語義示例說明

在《SQL概覽》中對JOIN語義有過簡單介紹,這裡會進行展開介紹。 我們以開篇示例中的三張表:學生表(學号,姓名,性别),課程表(課程号,課程名,學分)和成績表(學号,課程号,分數)來介紹各種JOIN的語義。

CROSS JOIN

交叉連接配接會對兩個表進行笛卡爾積,也就是LEFT表的每一行和RIGHT表的所有行進行聯接,是以生成結果表的行數是兩個表行數的乘積,如student和course表的CROSS JOIN結果如下:

mysql> SELECT * FROM student JOIN course; +------+-------+------+-----+-------+--------+ | no   | name  | sex  | no  | name  | credit | +------+-------+------+-----+-------+--------+ | S001 | Sunny | M    | C01 | Java  |      2 | | S002 | Tom   | F    | C01 | Java  |      2 | | S003 | Kevin | M    | C01 | Java  |      2 | | S001 | Sunny | M    | C02 | Blink |      3 | | S002 | Tom   | F    | C02 | Blink |      3 | | S003 | Kevin | M    | C02 | Blink |      3 | | S001 | Sunny | M    | C03 | Spark |      3 | | S002 | Tom   | F    | C03 | Spark |      3 | | S003 | Kevin | M    | C03 | Spark |      3 | +------+-------+------+-----+-------+--------+ 9 rows in set (0.00 sec)

如上結果我們得到9行=student(3) x course(3)。交叉聯接一般會消耗較大的資源,也被很多使用者質疑交叉聯接存在的意義?(任何時候我們都有質疑的權利,同時也建議我們養成自己質疑自己“質疑”的習慣,就像小時候不了解父母的“廢話”一樣)。

我們以開篇的示例說明交叉聯接的巧妙之一,開篇中我們的查詢需求是:在學生表(學号,姓名,性别),課程表(課程号,課程名,學分)和成績表(學号,課程号,分數)中查詢所有學生的姓名,課程名和考試分數。開篇中的SQL語句得到的結果如下:

mysql> SELECT     ->   student.name, course.name, score     -> FROM student JOIN  score ON student.no = score.s_no     ->              JOIN course ON score.c_no = course.no; +-------+-------+-------+ | name  | name  | score | +-------+-------+-------+ | Sunny | Java  |    80 | | Sunny | Blink |    98 | | Sunny | Spark |    76 | | Kevin | Java  |    78 | | Kevin | Blink |    88 | | Kevin | Spark |    68 | +-------+-------+-------+ 6 rows in set (0.00 sec)

如上INNER JOIN的結果我們發現少了Tom同學的成績,原因是Tom同學沒有參加考試,在score表中沒有Tom的成績,但是我們可能希望雖然Tom沒有參加考試但仍然希望Tom的成績能夠在查詢結果中顯示(成績 0 分),面對這樣的需求,我們怎麼處理呢?交叉聯接可以幫助我們:

  • 第一步 student和course 進行交叉聯接:

mysql> SELECT     ->   stu.no, c.no, stu.name, c.name     -> FROM student stu JOIN course c  笛卡爾積     -> ORDER BY stu.no; -- 排序隻是友善大家檢視:) +------+-----+-------+-------+ | no   | no  | name  | name  | +------+-----+-------+-------+ | S001 | C03 | Sunny | Spark | | S001 | C01 | Sunny | Java  | | S001 | C02 | Sunny | Blink | | S002 | C03 | Tom   | Spark | | S002 | C01 | Tom   | Java  | | S002 | C02 | Tom   | Blink | | S003 | C02 | Kevin | Blink | | S003 | C03 | Kevin | Spark | | S003 | C01 | Kevin | Java  | +------+-----+-------+-------+ 9 rows in set (0.00 sec)

  • 第二步 将交叉聯接的結果與score表進行左外聯接,如下:

mysql> SELECT     ->   stu.no, c.no, stu.name, c.name,     ->    CASE     ->     WHEN s.score IS NULL THEN 0     ->     ELSE s.score     ->   END AS score     -> FROM student stu JOIN course c  -- 迪卡爾積     -> LEFT JOIN score s ON stu.no = s.s_no and c.no = s.c_no -- LEFT OUTER JOIN     -> ORDER BY stu.no; -- 排序隻是為了大家好看一點:) +------+-----+-------+-------+-------+ | no   | no  | name  | name  | score | +------+-----+-------+-------+-------+ | S001 | C03 | Sunny | Spark |    76 | | S001 | C01 | Sunny | Java  |    80 | | S001 | C02 | Sunny | Blink |    98 | | S002 | C02 | Tom   | Blink |     0 | -- TOM 雖然沒有參加考試,但是仍然看到他的資訊 | S002 | C03 | Tom   | Spark |     0 | | S002 | C01 | Tom   | Java  |     0 | | S003 | C02 | Kevin | Blink |    88 | | S003 | C03 | Kevin | Spark |    68 | | S003 | C01 | Kevin | Java  |    78 | +------+-----+-------+-------+-------+ 9 rows in set (0.00 sec)

經過CROSS JOIN幫我們将Tom的資訊也查詢出來了!(TOM 雖然沒有參加考試,但是仍然看到他的資訊)

INNER JOIN

内聯接在SQL92中 ON 表示聯接添加,可選的WHERE子句表示過濾條件,如開篇的示例就是一個多表的内聯接,我們在看一個簡單的示例: 查詢成績大于80分的學生學号,學生姓名和成績:

mysql> SELECT     ->   stu.no, stu.name , s.score     -> FROM student stu JOIN score s ON  stu.no = s.s_no     -> WHERE s.score > 80; +------+-------+-------+ | no   | name  | score | +------+-------+-------+ | S001 | Sunny |    98 | | S003 | Kevin |    88 | +------+-------+-------+ 2 rows in set (0.00 sec)

上面按語義的邏輯是:

  • 第一步:先進行student和score的内連接配接,如下:

mysql> SELECT     ->   stu.no, stu.name , s.score     -> FROM student stu JOIN score s ON  stu.no = s.s_no ; +------+-------+-------+ | no   | name  | score | +------+-------+-------+ | S001 | Sunny |    80 | | S001 | Sunny |    98 | | S001 | Sunny |    76 | | S003 | Kevin |    78 | | S003 | Kevin |    88 | | S003 | Kevin |    68 | +------+-------+-------+ 6 rows in set (0.00 sec)

  • 第二步:對内聯結果進行過濾, score > 80 得到,如下最終結果:   

-> WHERE s.score > 80; +------+-------+-------+ | no   | name  | score | +------+-------+-------+ | S001 | Sunny |    98 | | S003 | Kevin |    88 | +------+-------+-------+ 2 rows in set (0.00 sec) 

上面的查詢過程符合語義,但是如果在filter條件能過濾很多資料的時候,先進行資料的過濾,在進行内聯接會擷取更好的性能,比如我們手工寫一下:

mysql> SELECT     ->   no, name , score     -> FROM student stu JOIN ( SELECT s_no, score FROM score s WHERE s.score >80) as sc ON no = s_no; +------+-------+-------+ | no   | name  | score | +------+-------+-------+ | S001 | Sunny |    98 | | S003 | Kevin |    88 | +------+-------+-------+ 2 rows in set (0.00 sec)

上面寫法語義和第一種寫法語義一緻,得到相同的查詢結果,上面查詢過程是:

  • 第一步:執行過濾子查詢

mysql> SELECT s_no, score FROM score s WHERE s.score >80; +------+-------+ | s_no | score | +------+-------+ | S001 |    98 | | S003 |    88 | +------+-------+ 2 rows in set (0.00 sec)

  • 第二步:執行内連接配接   

-> ON no = s_no; +------+-------+-------+ | no   | name  | score | +------+-------+-------+ | S001 | Sunny |    98 | | S003 | Kevin |    88 | +------+-------+-------+ 2 rows in set (0.00 sec)

如上兩種寫法在語義上一緻,但查詢性能在數量很大的情況下會有很大差距。上面為了和大家示範相同的查詢語義,可以有不同的查詢方式,不同的執行計劃。實際上資料庫本身的優化器會自動進行查詢優化,在内聯接中ON的聯接條件和WHERE的過濾條件具有相同的優先級,具體的執行順序可以由資料庫的優化器根據性能消耗決定。也就是說實體執行計劃可以先執行過濾條件進行查詢優化,如果細心的讀者可能發現,在第二個寫法中,子查詢我們不但有行的過濾,也進行了列的裁剪(去除了對查詢結果沒有用的c_no列),這兩個變化實際上對應了資料庫中兩個優化規則:

  • filter push down
  • project push down

如上優化規則以filter push down 為例,示意優化器對執行plan的優化變動:

LEFT OUTER JOIN

左外聯接語義是傳回左表所有行,右表不存在補NULL,為了示範作用,我們查詢沒有參加考試的所有學生的成績單:

mysql> SELECT     ->   no, name , s.c_no, s.score     -> FROM student stu LEFT JOIN score s ON stu.no = s.s_no     -> WHERE s.score is NULL; +------+------+------+-------+ | no   | name | c_no | score | +------+------+------+-------+ | S002 | Tom  | NULL |  NULL | +------+------+------+-------+ 1 row in set (0.00 sec)

上面查詢的執行邏輯上也是分成兩步:

  • 第一步:左外聯接查詢

mysql> SELECT     ->   no, name , s.c_no, s.score     -> FROM student stu LEFT JOIN score s ON stu.no = s.s_no; +------+-------+------+-------+ | no   | name  | c_no | score | +------+-------+------+-------+ | S001 | Sunny | C01  |    80 | | S001 | Sunny | C02  |    98 | | S001 | Sunny | C03  |    76 | | S002 | Tom   | NULL |  NULL | -- 右表不存在的補NULL | S003 | Kevin | C01  |    78 | | S003 | Kevin | C02  |    88 | | S003 | Kevin | C03  |    68 | +------+-------+------+-------+ 7 rows in set (0.00 sec)

  • 第二步:過濾查詢

mysql> SELECT     ->   no, name , s.c_no, s.score     -> FROM student stu LEFT JOIN score s ON stu.no = s.s_no     -> WHERE s.score is NULL; +------+------+------+-------+ | no   | name | c_no | score | +------+------+------+-------+ | S002 | Tom  | NULL |  NULL | +------+------+------+-------+ 1 row in set (0.00 sec)

這兩個過程和上面分析的INNER JOIN一樣,但是這時候能否利用上面說的 filter push down的優化呢?根據LEFT OUTER JOIN的語義來講,答案是否定的。我們手工操作看一下:

  • 第一步:先進行過濾查詢(獲得一個空表)

mysql> SELECT * FROM score s WHERE s.score is NULL; Empty set (0.00 sec)

  • 第二步: 進行左外連結

mysql> SELECT     ->   no, name , s.c_no, s.score     -> FROM student stu LEFT JOIN (SELECT * FROM score s WHERE s.score is NULL) AS s ON stu.no = s.s_no; +------+-------+------+-------+ | no   | name  | c_no | score | +------+-------+------+-------+ | S001 | Sunny | NULL |  NULL | | S002 | Tom   | NULL |  NULL | | S003 | Kevin | NULL |  NULL | +------+-------+------+-------+ 3 rows in set (0.00 sec)

我們發現兩種寫法的結果不一緻,第一種寫法隻傳回Tom沒有參加考試,是我們預期的。第二種寫法傳回了Sunny,Tom和Kevin三名同學都沒有參加考試,這明顯是非預期的查詢結果。所有LEFT OUTER JOIN不能利用INNER JOIN的 filter push down優化。

RIGHT OUTER JOIN

右外連結語義是傳回右表所有行,左邊不存在補NULL,如下:

mysql> SELECT     ->   s.c_no, s.score, no, name     -> FROM score s RIGHT JOIN student stu ON stu.no = s.s_no; +------+-------+------+-------+ | c_no | score | no   | name  | +------+-------+------+-------+ | C01  |    80 | S001 | Sunny | | C02  |    98 | S001 | Sunny | | C03  |    76 | S001 | Sunny | | NULL |  NULL | S002 | Tom   | -- 左邊沒有的進行補 NULL | C01  |    78 | S003 | Kevin | | C02  |    88 | S003 | Kevin | | C03  |    68 | S003 | Kevin | +------+-------+------+-------+ 7 rows in set (0.00 sec)

上面右外連結我隻是将上面左外連結查詢的左右表交換了一下:)。

FULL OUTER JOIN

全外連結語義傳回左表和右表的并集,不存在一邊補NULL,用于示範的MySQL資料庫不支援FULL OUTER JOIN。這裡不做示範了。

SELF JOIN

上面介紹的INNER JOIN、OUTER JOIN都是不同表之間的聯接查詢,自聯接是一張表以不同的别名做為左右兩個表,可以進行如上的INNER JOIN和OUTER JOIN。如下看一個INNER 自聯接:

mysql> SELECT * FROM student l JOIN student r where l.no = r.no; +------+-------+------+------+-------+------+ | no   | name  | sex  | no   | name  | sex  | +------+-------+------+------+-------+------+ | S001 | Sunny | M    | S001 | Sunny | M    | | S002 | Tom   | F    | S002 | Tom   | F    | | S003 | Kevin | M    | S003 | Kevin | M    | +------+-------+------+------+-------+------+ 3 rows in set (0.00 sec) 

不等值聯接

這裡說的不等值聯接是SQL92文法裡面的ON子句裡面隻有不等值聯接,比如:

mysql> SELECT     ->   s.c_no, s.score, no, name     -> FROM score s RIGHT JOIN student stu ON stu.no != s.c_no; +------+-------+------+-------+ | c_no | score | no   | name  | +------+-------+------+-------+ | C01  |    80 | S001 | Sunny | | C01  |    80 | S002 | Tom   | | C01  |    80 | S003 | Kevin | | C02  |    98 | S001 | Sunny | | C02  |    98 | S002 | Tom   | | C02  |    98 | S003 | Kevin | | C03  |    76 | S001 | Sunny | | C03  |    76 | S002 | Tom   | | C03  |    76 | S003 | Kevin | | C01  |    78 | S001 | Sunny | | C01  |    78 | S002 | Tom   | | C01  |    78 | S003 | Kevin | | C02  |    88 | S001 | Sunny | | C02  |    88 | S002 | Tom   | | C02  |    88 | S003 | Kevin | | C03  |    68 | S001 | Sunny | | C03  |    68 | S002 | Tom   | | C03  |    68 | S003 | Kevin | +------+-------+------+-------+ 18 rows in set (0.00 sec)

上面這示例,其實沒有什麼實際業務價值,在實際的使用場景中,不等值聯接往往是結合等值聯接,将不等值條件在WHERE子句指定,即, 帶有WHERE子句的等值聯接。

Apache Flink雙流JOIN

CROSS

INNER

OUTER

SELF

ON

WHERE

Apache Flink N Y 必選 可選

Apache Flink目前支援INNER JOIN和LEFT OUTER JOIN(SELF 可以轉換為普通的INNER和OUTER)。在語義上面Apache Flink嚴格遵守标準SQL的語義,與上面示範的語義一緻。下面我重點介紹Apache Flink中JOIN的實作原理。

雙流JOIN與傳統資料庫表JOIN的差別

傳統資料庫表的JOIN是兩張靜态表的資料聯接,在流上面是 動态表(關于流與動态表的關系請查閱 《Apache Flink 漫談系列 - 流表對偶(duality)性)》,雙流JOIN的資料不斷流入與傳統資料庫表的JOIN有如下3個核心差別:

  • 左右兩邊的資料集合無窮 - 傳統資料庫左右兩個表的資料集合是有限的,雙流JOIN的資料會源源不斷的流入;
  • JOIN的結果不斷産生/更新 - 傳統資料庫表JOIN是一次執行産生最終結果後退出,雙流JOIN會持續不斷的産生新的結果。在 《Apache Flink 漫談系列 - 持續查詢(Continuous Queries)》篇也有相關介紹。
  • 查詢計算的雙邊驅動 - 雙流JOIN由于左右兩邊的流的速度不一樣,會導緻左邊資料到來的時候右邊資料還沒有到來,或者右邊資料到來的時候左邊資料沒有到來,是以在實作中要将左右兩邊的流資料進行儲存,以保證JOIN的語義。在Blink中會以State的方式進行資料的存儲。State相關請檢視《Apache Flink 漫談系列 - State》篇。

資料Shuffle

分布式流計算所有資料會進行Shuffle,怎麼才能保障左右兩邊流的要JOIN的資料會在相同的節點進行處理呢?在雙流JOIN的場景,我們會利用JOIN中ON的聯接key進行partition,確定兩個流相同的聯接key會在同一個節點處理。

資料的儲存

不論是INNER JOIN還是OUTER JOIN 都需要對左右兩邊的流的資料進行儲存,JOIN算子會開辟左右兩個State進行資料存儲,左右兩邊的資料到來時候,進行如下操作:

  • LeftEvent到來存儲到LState,RightEvent到來的時候存儲到RState;
  • LeftEvent會去RightState進行JOIN,并發出所有JOIN之後的Event到下遊;
  • RightEvent會去LeftState進行JOIN,并發出所有JOIN之後的Event到下遊。

簡單場景介紹實作原理 

INNER JOIN 實作

JOIN有很多複雜的場景,我們先以最簡單的場景進行實作原理的介紹,比如:最直接的兩個進行INNER JOIN,比如查詢産品庫存和訂單數量,庫存變化事件流和訂單事件流進行INNER JOIN,JION條件是産品ID,具體如下:

雙流JOIN兩邊事件都會存儲到State裡面,如上,事件流按照标号先後流入到join節點,我們假設右邊流比較快,先流入了3個事件,3個事件會存儲到state中,但因為左邊還沒有資料,所有右邊前3個事件流入時候,沒有join結果流出,當左邊第一個事件序号為4的流入時候,先存儲左邊state,再與右邊已經流入的3個事件進行join,join的結果如圖 三行結果會流入到下遊節點sink。當第5号事件流入時候,也會和左邊第4号事件進行join,流出一條jion結果到下遊節點。這裡關于INNER JOIN的語義和大家強調兩點:

  • INNER JOIN隻有符合JOIN條件時候才會有JOIN結果流出到下遊,比如右邊最先來的1,2,3個事件,流入時候沒有任何輸出,因為左邊還沒有可以JOIN的事件;
  • INNER JOIN兩邊的資料不論如何亂序,都能夠保證和傳統資料庫語義一緻,因為我們儲存了左右兩個流的所有事件到state中。

LEFT OUTER JOIN 實作

LEFT OUTER JOIN 可以簡寫 LEFT JOIN,語義上和INNER JOIN的差別是不論右流是否有JOIN的事件,左流的事件都需要流入下遊節點,但右流沒有可以JION的事件時候,右邊的事件補NULL。同樣我們以最簡單的場景說明LEFT JOIN的實作,比如查詢産品庫存和訂單數量,庫存變化事件流和訂單事件流進行LEFT JOIN,JION條件是産品ID,具體如下:

下圖也是表達LEFT JOIN的語義,隻是展現方式不同:

上圖主要關注點是當左邊先流入1,2事件時候,右邊沒有可以join的事件時候會向下遊發送左邊事件并補NULL向下遊發出,當右邊第一個相同的Join key到來的時候會将左邊先來的事件發出的帶有NULL的事件撤回(對應上面command的-記錄,+代表正向記錄,-代表撤回記錄)。這裡強調三點:

  • 左流的事件當右邊沒有JOIN的事件時候,将右邊事件列補NULL後流向下遊;* 當右邊事件流入發現左邊已經有可以JOIN的key的時候,并且是第一個可以JOIN上的右邊事件(比如上面的3事件是第一個可以和左邊JOIN key P001進行JOIN的事件)需要撤回左邊下發的NULL記錄,并下發JOIN完整(帶有右邊事件列)的事件到下遊。後續來的4,5,6,8等待後續P001的事件是不會産生撤回記錄的。
  • 在Apache Flink系統内部事件類型分為正向事件标記為“+”和撤回事件标記為“-”。

RIGHT OUTER JOIN  和 FULL OUTER JOIN

RIGHT JOIN内部實作與LEFT JOIN類似, FULL JOIN和LEFT JOIN的差別是左右兩邊都會産生補NULL和撤回的操作。對于State的使用都是相似的,這裡不再重複說明了。

複雜場景介紹State結構

上面我們介紹了雙流JOIN會使用State記錄左右兩邊流的事件,同時我們示例資料的場景也是比較簡單,比如流上沒有更新事件(沒有撤回事件),同時流上沒有重複行事件。那麼我們嘗試思考下面的事件流在雙流JOIN時候是怎麼處理的?

上圖示例是連續産生了2筆銷售數量一樣的訂單,同時在産生一筆銷售數量為5的訂單之後,又将該訂單取消了(或者退貨了),這樣在事件流上面就會是上圖的示意,這種情況Blink内部如何支撐呢?

根據JOIN的語義以INNER JOIN為例,右邊有兩條相同的訂單流入,我們就應該向下遊輸出兩條JOIN結果,當有撤回的事件流入時候,我們也需要将已經下發下遊的JOIN事件撤回,如下:

上面的場景以及LEFT JOIN部分介紹的撤回情況,Apache Flink内部需要處理如下幾個核心點:

  • 記錄重複記錄(完整記錄重複記錄或者記錄相同記錄的個數)
  • 記錄正向記錄和撤回記錄(完整記錄正向和撤回記錄或者記錄個數)
  • 記錄哪一條事件是第一個可以與左邊事件進行JOIN的事件

雙流JOIN的State資料結構

在Apache Flink内部對不同的場景有特殊的資料結構優化,本篇我們隻針對上面說的情況(通用設計)介紹一下雙流JOIN的State的資料結構和用途:

資料結構

  • Map<JoinKey, Map<rowData, count>>;
    • 第一級MAP的key是Join key,比如示例中的P001, value是流上面的所有完整事件;
    • 第二級MAP的key是行資料,比如示例中的P001, 2,value是相同僚件值的個數

資料結構的利用

  • 記錄重複記錄 - 利用第二級MAP的value記錄重複記錄的個數,這樣大大減少存儲和讀取
  • 正向記錄和撤回記錄 - 利用第二級MAP的value記錄,當count=0時候删除該元素
  • 判斷右邊是否産生撤回記錄 - 根據第一級MAP的value的size來判斷是否産生撤回,隻有size由0變成1的時候(第一條和左可以JOIN的事件)才産生撤回

雙流JOIN的應用優化

構造更新流

我們在 《Apache Flink 漫談系列 - 持續查詢(Continuous Queries)》篇中以雙流JOIN為例介紹了如何構造業務上的PK source,構造PK source本質上在保證業務語義的同時也是對雙流JOIN的一種優化,比如多級LEFT JOIN會讓流上的資料不斷膨脹,造成JOIN節點性能較慢,JOIN之後的下遊節點邊堵(資料量大導緻,非熱點)。那麼嫌少流入JOIN的資料,比如構造PK source就會大大減少JOIN資料的膨脹。這裡不再重複舉例,大家可以查閱 《Apache Flink 漫談系列 - 持續查詢(Continuous Queries)》 的雙流JOIN示例部分。

NULL造成的熱點

比如我們有A LEFT JOIN  B ON A.aCol = B.bCol LEFT JOIN  C ON B.cCol = C.cCol 的業務,JOB的DAG如下:

假設在實際業務中有這樣的特點,大部分時候當A事件流入的時候,B還沒有可以JOIN的資料,但是B來的時候,A已經有可以JOIN的資料了,這特點就會導緻,A LEFT JOIN B 會産生大量的 (A, NULL),其中包括B裡面的 cCol 列也是NULL,這時候當與C進行LEFT JOIN的時候,首先Blink内部會利用cCol對AB的JOIN産生的事件流進行Shuffle, cCol是NULL進而是下遊節點大量的NULL事件流入,造成熱點。那麼這問題如何解決呢?

我們可以改變JOIN的先後順序,來保證A LEFT JOIN B 不會産生NULL的熱點問題,如下:

JOIN ReOrder

對于JOIN算子的實作我們知道左右兩邊的事件都會存儲到State中,在流入事件時候在從另一邊讀取所有事件進行JOIN計算,這樣的實作邏輯在資料量很大的場景會有一定的state操作瓶頸,我們某些場景可以通過業務角度調整JOIN的順序,來消除性能瓶頸,比如:A JOIN B ON A.acol = B.bcol  JOIN  C ON B.bcol = C.ccol. 這樣的場景,如果 A與B進行JOIN産生資料量很大,但是B與C進行JOIN産生的資料量很小,那麼我們可以強制調整JOIN的聯接順序,B JOIN C ON b.bcol = c.ccol JOIN A ON a.acol = b.bcol. 如下示意圖:

小結

繼續閱讀