聊什麼
在《SQL概覽》中我們介紹了JOIN算子的語義和基本的使用方式,介紹過程中大家發現Apache Flink在文法語義上是遵循ANSI-SQL标準的,那麼再深思一下傳統資料庫為啥需要有JOIN算子呢?在實作原理上面Apache Flink内部實作和傳統資料庫有什麼差別呢?本篇将詳盡的為大家介紹傳統資料庫為什麼需要JOIN算子,以及JOIN算子在Apache Flink中的底層實作原理和在實際使用中的優化!
什麼是JOIN
在《Apache Flink 漫談系列 - SQL概覽》中我對JOIN算子有過簡單的介紹,這裡我們以具體執行個體的方式讓大家對JOIN算子加深印象。JOIN的本質是分别從N(N>=1)張表中擷取不同的字段,進而得到最完整的記錄行。比如我們有一個查詢需求:在學生表(學号,姓名,性别),課程表(課程号,課程名,學分)和成績表(學号,課程号,分數)中查詢所有學生的姓名,課程名和考試分數。如下
為啥需要JOIN
JOIN的本質是資料拼接,那麼如果我們将所有資料列存儲在一張大表中,是不是就不需要JOIN了呢?如果真的能将所需的資料都在一張表存儲,我想就真的不需要JOIN的算子了,但現實業務中真的能做到将所需資料放到同一張大表裡面嗎?答案是否定的,核心原因有2個:
- 産生資料的源頭可能不是一個系統;
- 産生資料的源頭是同一個系統,但是資料備援的沉重代價,迫使我們會遵循資料庫範式,進行表的設計。簡說NF如下:
- 1NF - 列不可再分;
- 2NF - 符合1NF,并且非主鍵屬性全部依賴于主鍵屬性;
- 3NF - 符合2NF,并且消除傳遞依賴,即:任何字段不能由其他字段派生出來;
- BCNF - 符合3NF,并且主鍵屬性之間無依賴關系。
當然還有 4NF,5NF,不過在實際的資料庫設計過程中做到BCNF已經足夠了!(并非否定4NF,5NF存在的意義,隻是個人還沒有遇到一定要用4NF,5NF的場景,設計往往會按存儲成本,查詢性能等綜合因素考量)
JOIN種類
JOIN 在傳統資料庫中有如下分類:
- CROSS JOIN - 交叉連接配接,計算笛卡兒積;
- INNER JOIN - 内連接配接,傳回滿足條件的記錄;
- OUTER JOIN
- LEFT - 傳回左表所有行,右表不存在補NULL;
- RIGHT - 傳回右表所有行,左邊不存在補NULL;
- FULL - 傳回左表和右表的并集,不存在一邊補NULL;
- SELF JOIN - 自連接配接,将表查詢時候命名不同的别名。
JOIN文法
JOIN 在SQL89和SQL92中有不同的文法,以INNER JOIN為例說明:
- SQL89 - 表之間用“,”逗号分割,連結條件和過濾條件都在Where子句指定:
SELECT
a.colA,
b.colA
FROM
tab1 AS a , tab2 AS b
WHERE a.id = b.id and a.other > b.other
- SQL92 - SQL92将連結條件在ON子句指定,過濾條件在WHERE子句指定,邏輯更為清晰:
SELECT
a.colA,
b.colA
FROM
tab1 AS a JOIN tab2 AS b ON a.id = b.id
WHERE
a.other > b.other
本篇中的後續示例将應用SQL92文法進行SQL的編寫,文法如下:
tableExpression [ LEFT|RIGHT|FULL|INNER|SELF ] JOIN tableExpression [ ON joinCondition ] [WHERE filterCondition]
語義示例說明
在《SQL概覽》中對JOIN語義有過簡單介紹,這裡會進行展開介紹。 我們以開篇示例中的三張表:學生表(學号,姓名,性别),課程表(課程号,課程名,學分)和成績表(學号,課程号,分數)來介紹各種JOIN的語義。
CROSS JOIN
交叉連接配接會對兩個表進行笛卡爾積,也就是LEFT表的每一行和RIGHT表的所有行進行聯接,是以生成結果表的行數是兩個表行數的乘積,如student和course表的CROSS JOIN結果如下:
mysql> SELECT * FROM student JOIN course;
+------+-------+------+-----+-------+--------+
| no | name | sex | no | name | credit |
+------+-------+------+-----+-------+--------+
| S001 | Sunny | M | C01 | Java | 2 |
| S002 | Tom | F | C01 | Java | 2 |
| S003 | Kevin | M | C01 | Java | 2 |
| S001 | Sunny | M | C02 | Blink | 3 |
| S002 | Tom | F | C02 | Blink | 3 |
| S003 | Kevin | M | C02 | Blink | 3 |
| S001 | Sunny | M | C03 | Spark | 3 |
| S002 | Tom | F | C03 | Spark | 3 |
| S003 | Kevin | M | C03 | Spark | 3 |
+------+-------+------+-----+-------+--------+
9 rows in set (0.00 sec)
如上結果我們得到9行=student(3) x course(3)。交叉聯接一般會消耗較大的資源,也被很多使用者質疑交叉聯接存在的意義?(任何時候我們都有質疑的權利,同時也建議我們養成自己質疑自己“質疑”的習慣,就像小時候不了解父母的“廢話”一樣)。
我們以開篇的示例說明交叉聯接的巧妙之一,開篇中我們的查詢需求是:在學生表(學号,姓名,性别),課程表(課程号,課程名,學分)和成績表(學号,課程号,分數)中查詢所有學生的姓名,課程名和考試分數。開篇中的SQL語句得到的結果如下:
mysql> SELECT
-> student.name, course.name, score
-> FROM student JOIN score ON student.no = score.s_no
-> JOIN course ON score.c_no = course.no;
+-------+-------+-------+
| name | name | score |
+-------+-------+-------+
| Sunny | Java | 80 |
| Sunny | Blink | 98 |
| Sunny | Spark | 76 |
| Kevin | Java | 78 |
| Kevin | Blink | 88 |
| Kevin | Spark | 68 |
+-------+-------+-------+
6 rows in set (0.00 sec)
如上INNER JOIN的結果我們發現少了Tom同學的成績,原因是Tom同學沒有參加考試,在score表中沒有Tom的成績,但是我們可能希望雖然Tom沒有參加考試但仍然希望Tom的成績能夠在查詢結果中顯示(成績 0 分),面對這樣的需求,我們怎麼處理呢?交叉聯接可以幫助我們:
- 第一步 student和course 進行交叉聯接:
mysql> SELECT
-> stu.no, c.no, stu.name, c.name
-> FROM student stu JOIN course c 笛卡爾積
-> ORDER BY stu.no; -- 排序隻是友善大家檢視:)
+------+-----+-------+-------+
| no | no | name | name |
+------+-----+-------+-------+
| S001 | C03 | Sunny | Spark |
| S001 | C01 | Sunny | Java |
| S001 | C02 | Sunny | Blink |
| S002 | C03 | Tom | Spark |
| S002 | C01 | Tom | Java |
| S002 | C02 | Tom | Blink |
| S003 | C02 | Kevin | Blink |
| S003 | C03 | Kevin | Spark |
| S003 | C01 | Kevin | Java |
+------+-----+-------+-------+
9 rows in set (0.00 sec)
- 第二步 将交叉聯接的結果與score表進行左外聯接,如下:
mysql> SELECT
-> stu.no, c.no, stu.name, c.name,
-> CASE
-> WHEN s.score IS NULL THEN 0
-> ELSE s.score
-> END AS score
-> FROM student stu JOIN course c -- 迪卡爾積
-> LEFT JOIN score s ON stu.no = s.s_no and c.no = s.c_no -- LEFT OUTER JOIN
-> ORDER BY stu.no; -- 排序隻是為了大家好看一點:)
+------+-----+-------+-------+-------+
| no | no | name | name | score |
+------+-----+-------+-------+-------+
| S001 | C03 | Sunny | Spark | 76 |
| S001 | C01 | Sunny | Java | 80 |
| S001 | C02 | Sunny | Blink | 98 |
| S002 | C02 | Tom | Blink | 0 | -- TOM 雖然沒有參加考試,但是仍然看到他的資訊
| S002 | C03 | Tom | Spark | 0 |
| S002 | C01 | Tom | Java | 0 |
| S003 | C02 | Kevin | Blink | 88 |
| S003 | C03 | Kevin | Spark | 68 |
| S003 | C01 | Kevin | Java | 78 |
+------+-----+-------+-------+-------+
9 rows in set (0.00 sec)
經過CROSS JOIN幫我們将Tom的資訊也查詢出來了!(TOM 雖然沒有參加考試,但是仍然看到他的資訊)
INNER JOIN
内聯接在SQL92中 ON 表示聯接添加,可選的WHERE子句表示過濾條件,如開篇的示例就是一個多表的内聯接,我們在看一個簡單的示例: 查詢成績大于80分的學生學号,學生姓名和成績:
mysql> SELECT
-> stu.no, stu.name , s.score
-> FROM student stu JOIN score s ON stu.no = s.s_no
-> WHERE s.score > 80;
+------+-------+-------+
| no | name | score |
+------+-------+-------+
| S001 | Sunny | 98 |
| S003 | Kevin | 88 |
+------+-------+-------+
2 rows in set (0.00 sec)
上面按語義的邏輯是:
- 第一步:先進行student和score的内連接配接,如下:
mysql> SELECT
-> stu.no, stu.name , s.score
-> FROM student stu JOIN score s ON stu.no = s.s_no ;
+------+-------+-------+
| no | name | score |
+------+-------+-------+
| S001 | Sunny | 80 |
| S001 | Sunny | 98 |
| S001 | Sunny | 76 |
| S003 | Kevin | 78 |
| S003 | Kevin | 88 |
| S003 | Kevin | 68 |
+------+-------+-------+
6 rows in set (0.00 sec)
- 第二步:對内聯結果進行過濾, score > 80 得到,如下最終結果:
-> WHERE s.score > 80;
+------+-------+-------+
| no | name | score |
+------+-------+-------+
| S001 | Sunny | 98 |
| S003 | Kevin | 88 |
+------+-------+-------+
2 rows in set (0.00 sec)
上面的查詢過程符合語義,但是如果在filter條件能過濾很多資料的時候,先進行資料的過濾,在進行内聯接會擷取更好的性能,比如我們手工寫一下:
mysql> SELECT
-> no, name , score
-> FROM student stu JOIN ( SELECT s_no, score FROM score s WHERE s.score >80) as sc ON no = s_no;
+------+-------+-------+
| no | name | score |
+------+-------+-------+
| S001 | Sunny | 98 |
| S003 | Kevin | 88 |
+------+-------+-------+
2 rows in set (0.00 sec)
上面寫法語義和第一種寫法語義一緻,得到相同的查詢結果,上面查詢過程是:
- 第一步:執行過濾子查詢
mysql> SELECT s_no, score FROM score s WHERE s.score >80;
+------+-------+
| s_no | score |
+------+-------+
| S001 | 98 |
| S003 | 88 |
+------+-------+
2 rows in set (0.00 sec)
- 第二步:執行内連接配接
-> ON no = s_no;
+------+-------+-------+
| no | name | score |
+------+-------+-------+
| S001 | Sunny | 98 |
| S003 | Kevin | 88 |
+------+-------+-------+
2 rows in set (0.00 sec)
如上兩種寫法在語義上一緻,但查詢性能在數量很大的情況下會有很大差距。上面為了和大家示範相同的查詢語義,可以有不同的查詢方式,不同的執行計劃。實際上資料庫本身的優化器會自動進行查詢優化,在内聯接中ON的聯接條件和WHERE的過濾條件具有相同的優先級,具體的執行順序可以由資料庫的優化器根據性能消耗決定。也就是說實體執行計劃可以先執行過濾條件進行查詢優化,如果細心的讀者可能發現,在第二個寫法中,子查詢我們不但有行的過濾,也進行了列的裁剪(去除了對查詢結果沒有用的c_no列),這兩個變化實際上對應了資料庫中兩個優化規則:
- filter push down
- project push down
如上優化規則以filter push down 為例,示意優化器對執行plan的優化變動:
LEFT OUTER JOIN
左外聯接語義是傳回左表所有行,右表不存在補NULL,為了示範作用,我們查詢沒有參加考試的所有學生的成績單:
mysql> SELECT
-> no, name , s.c_no, s.score
-> FROM student stu LEFT JOIN score s ON stu.no = s.s_no
-> WHERE s.score is NULL;
+------+------+------+-------+
| no | name | c_no | score |
+------+------+------+-------+
| S002 | Tom | NULL | NULL |
+------+------+------+-------+
1 row in set (0.00 sec)
上面查詢的執行邏輯上也是分成兩步:
- 第一步:左外聯接查詢
mysql> SELECT
-> no, name , s.c_no, s.score
-> FROM student stu LEFT JOIN score s ON stu.no = s.s_no;
+------+-------+------+-------+
| no | name | c_no | score |
+------+-------+------+-------+
| S001 | Sunny | C01 | 80 |
| S001 | Sunny | C02 | 98 |
| S001 | Sunny | C03 | 76 |
| S002 | Tom | NULL | NULL | -- 右表不存在的補NULL
| S003 | Kevin | C01 | 78 |
| S003 | Kevin | C02 | 88 |
| S003 | Kevin | C03 | 68 |
+------+-------+------+-------+
7 rows in set (0.00 sec)
- 第二步:過濾查詢
mysql> SELECT
-> no, name , s.c_no, s.score
-> FROM student stu LEFT JOIN score s ON stu.no = s.s_no
-> WHERE s.score is NULL;
+------+------+------+-------+
| no | name | c_no | score |
+------+------+------+-------+
| S002 | Tom | NULL | NULL |
+------+------+------+-------+
1 row in set (0.00 sec)
這兩個過程和上面分析的INNER JOIN一樣,但是這時候能否利用上面說的 filter push down的優化呢?根據LEFT OUTER JOIN的語義來講,答案是否定的。我們手工操作看一下:
- 第一步:先進行過濾查詢(獲得一個空表)
mysql> SELECT * FROM score s WHERE s.score is NULL;
Empty set (0.00 sec)
- 第二步: 進行左外連結
mysql> SELECT
-> no, name , s.c_no, s.score
-> FROM student stu LEFT JOIN (SELECT * FROM score s WHERE s.score is NULL) AS s ON stu.no = s.s_no;
+------+-------+------+-------+
| no | name | c_no | score |
+------+-------+------+-------+
| S001 | Sunny | NULL | NULL |
| S002 | Tom | NULL | NULL |
| S003 | Kevin | NULL | NULL |
+------+-------+------+-------+
3 rows in set (0.00 sec)
我們發現兩種寫法的結果不一緻,第一種寫法隻傳回Tom沒有參加考試,是我們預期的。第二種寫法傳回了Sunny,Tom和Kevin三名同學都沒有參加考試,這明顯是非預期的查詢結果。所有LEFT OUTER JOIN不能利用INNER JOIN的 filter push down優化。
RIGHT OUTER JOIN
右外連結語義是傳回右表所有行,左邊不存在補NULL,如下:
mysql> SELECT
-> s.c_no, s.score, no, name
-> FROM score s RIGHT JOIN student stu ON stu.no = s.s_no;
+------+-------+------+-------+
| c_no | score | no | name |
+------+-------+------+-------+
| C01 | 80 | S001 | Sunny |
| C02 | 98 | S001 | Sunny |
| C03 | 76 | S001 | Sunny |
| NULL | NULL | S002 | Tom | -- 左邊沒有的進行補 NULL
| C01 | 78 | S003 | Kevin |
| C02 | 88 | S003 | Kevin |
| C03 | 68 | S003 | Kevin |
+------+-------+------+-------+
7 rows in set (0.00 sec)
上面右外連結我隻是将上面左外連結查詢的左右表交換了一下:)。
FULL OUTER JOIN
全外連結語義傳回左表和右表的并集,不存在一邊補NULL,用于示範的MySQL資料庫不支援FULL OUTER JOIN。這裡不做示範了。
SELF JOIN
上面介紹的INNER JOIN、OUTER JOIN都是不同表之間的聯接查詢,自聯接是一張表以不同的别名做為左右兩個表,可以進行如上的INNER JOIN和OUTER JOIN。如下看一個INNER 自聯接:
mysql> SELECT * FROM student l JOIN student r where l.no = r.no;
+------+-------+------+------+-------+------+
| no | name | sex | no | name | sex |
+------+-------+------+------+-------+------+
| S001 | Sunny | M | S001 | Sunny | M |
| S002 | Tom | F | S002 | Tom | F |
| S003 | Kevin | M | S003 | Kevin | M |
+------+-------+------+------+-------+------+
3 rows in set (0.00 sec)
不等值聯接
這裡說的不等值聯接是SQL92文法裡面的ON子句裡面隻有不等值聯接,比如:
mysql> SELECT
-> s.c_no, s.score, no, name
-> FROM score s RIGHT JOIN student stu ON stu.no != s.c_no;
+------+-------+------+-------+
| c_no | score | no | name |
+------+-------+------+-------+
| C01 | 80 | S001 | Sunny |
| C01 | 80 | S002 | Tom |
| C01 | 80 | S003 | Kevin |
| C02 | 98 | S001 | Sunny |
| C02 | 98 | S002 | Tom |
| C02 | 98 | S003 | Kevin |
| C03 | 76 | S001 | Sunny |
| C03 | 76 | S002 | Tom |
| C03 | 76 | S003 | Kevin |
| C01 | 78 | S001 | Sunny |
| C01 | 78 | S002 | Tom |
| C01 | 78 | S003 | Kevin |
| C02 | 88 | S001 | Sunny |
| C02 | 88 | S002 | Tom |
| C02 | 88 | S003 | Kevin |
| C03 | 68 | S001 | Sunny |
| C03 | 68 | S002 | Tom |
| C03 | 68 | S003 | Kevin |
+------+-------+------+-------+
18 rows in set (0.00 sec)
上面這示例,其實沒有什麼實際業務價值,在實際的使用場景中,不等值聯接往往是結合等值聯接,将不等值條件在WHERE子句指定,即, 帶有WHERE子句的等值聯接。
Apache Flink雙流JOIN
CROSS
INNER
OUTER
SELF
ON
WHERE
Apache Flink | N | Y | 必選 | 可選 |
Apache Flink目前支援INNER JOIN和LEFT OUTER JOIN(SELF 可以轉換為普通的INNER和OUTER)。在語義上面Apache Flink嚴格遵守标準SQL的語義,與上面示範的語義一緻。下面我重點介紹Apache Flink中JOIN的實作原理。
雙流JOIN與傳統資料庫表JOIN的差別
傳統資料庫表的JOIN是兩張靜态表的資料聯接,在流上面是 動态表(關于流與動态表的關系請查閱 《Apache Flink 漫談系列 - 流表對偶(duality)性)》,雙流JOIN的資料不斷流入與傳統資料庫表的JOIN有如下3個核心差別:
- 左右兩邊的資料集合無窮 - 傳統資料庫左右兩個表的資料集合是有限的,雙流JOIN的資料會源源不斷的流入;
- JOIN的結果不斷産生/更新 - 傳統資料庫表JOIN是一次執行産生最終結果後退出,雙流JOIN會持續不斷的産生新的結果。在 《Apache Flink 漫談系列 - 持續查詢(Continuous Queries)》篇也有相關介紹。
- 查詢計算的雙邊驅動 - 雙流JOIN由于左右兩邊的流的速度不一樣,會導緻左邊資料到來的時候右邊資料還沒有到來,或者右邊資料到來的時候左邊資料沒有到來,是以在實作中要将左右兩邊的流資料進行儲存,以保證JOIN的語義。在Blink中會以State的方式進行資料的存儲。State相關請檢視《Apache Flink 漫談系列 - State》篇。
資料Shuffle
分布式流計算所有資料會進行Shuffle,怎麼才能保障左右兩邊流的要JOIN的資料會在相同的節點進行處理呢?在雙流JOIN的場景,我們會利用JOIN中ON的聯接key進行partition,確定兩個流相同的聯接key會在同一個節點處理。
資料的儲存
不論是INNER JOIN還是OUTER JOIN 都需要對左右兩邊的流的資料進行儲存,JOIN算子會開辟左右兩個State進行資料存儲,左右兩邊的資料到來時候,進行如下操作:
- LeftEvent到來存儲到LState,RightEvent到來的時候存儲到RState;
- LeftEvent會去RightState進行JOIN,并發出所有JOIN之後的Event到下遊;
- RightEvent會去LeftState進行JOIN,并發出所有JOIN之後的Event到下遊。
簡單場景介紹實作原理
INNER JOIN 實作
JOIN有很多複雜的場景,我們先以最簡單的場景進行實作原理的介紹,比如:最直接的兩個進行INNER JOIN,比如查詢産品庫存和訂單數量,庫存變化事件流和訂單事件流進行INNER JOIN,JION條件是産品ID,具體如下:
雙流JOIN兩邊事件都會存儲到State裡面,如上,事件流按照标号先後流入到join節點,我們假設右邊流比較快,先流入了3個事件,3個事件會存儲到state中,但因為左邊還沒有資料,所有右邊前3個事件流入時候,沒有join結果流出,當左邊第一個事件序号為4的流入時候,先存儲左邊state,再與右邊已經流入的3個事件進行join,join的結果如圖 三行結果會流入到下遊節點sink。當第5号事件流入時候,也會和左邊第4号事件進行join,流出一條jion結果到下遊節點。這裡關于INNER JOIN的語義和大家強調兩點:
- INNER JOIN隻有符合JOIN條件時候才會有JOIN結果流出到下遊,比如右邊最先來的1,2,3個事件,流入時候沒有任何輸出,因為左邊還沒有可以JOIN的事件;
- INNER JOIN兩邊的資料不論如何亂序,都能夠保證和傳統資料庫語義一緻,因為我們儲存了左右兩個流的所有事件到state中。
LEFT OUTER JOIN 實作
LEFT OUTER JOIN 可以簡寫 LEFT JOIN,語義上和INNER JOIN的差別是不論右流是否有JOIN的事件,左流的事件都需要流入下遊節點,但右流沒有可以JION的事件時候,右邊的事件補NULL。同樣我們以最簡單的場景說明LEFT JOIN的實作,比如查詢産品庫存和訂單數量,庫存變化事件流和訂單事件流進行LEFT JOIN,JION條件是産品ID,具體如下:
下圖也是表達LEFT JOIN的語義,隻是展現方式不同:
上圖主要關注點是當左邊先流入1,2事件時候,右邊沒有可以join的事件時候會向下遊發送左邊事件并補NULL向下遊發出,當右邊第一個相同的Join key到來的時候會将左邊先來的事件發出的帶有NULL的事件撤回(對應上面command的-記錄,+代表正向記錄,-代表撤回記錄)。這裡強調三點:
- 左流的事件當右邊沒有JOIN的事件時候,将右邊事件列補NULL後流向下遊;* 當右邊事件流入發現左邊已經有可以JOIN的key的時候,并且是第一個可以JOIN上的右邊事件(比如上面的3事件是第一個可以和左邊JOIN key P001進行JOIN的事件)需要撤回左邊下發的NULL記錄,并下發JOIN完整(帶有右邊事件列)的事件到下遊。後續來的4,5,6,8等待後續P001的事件是不會産生撤回記錄的。
- 在Apache Flink系統内部事件類型分為正向事件标記為“+”和撤回事件标記為“-”。
RIGHT OUTER JOIN 和 FULL OUTER JOIN
RIGHT JOIN内部實作與LEFT JOIN類似, FULL JOIN和LEFT JOIN的差別是左右兩邊都會産生補NULL和撤回的操作。對于State的使用都是相似的,這裡不再重複說明了。
複雜場景介紹State結構
上面我們介紹了雙流JOIN會使用State記錄左右兩邊流的事件,同時我們示例資料的場景也是比較簡單,比如流上沒有更新事件(沒有撤回事件),同時流上沒有重複行事件。那麼我們嘗試思考下面的事件流在雙流JOIN時候是怎麼處理的?
上圖示例是連續産生了2筆銷售數量一樣的訂單,同時在産生一筆銷售數量為5的訂單之後,又将該訂單取消了(或者退貨了),這樣在事件流上面就會是上圖的示意,這種情況Blink内部如何支撐呢?
根據JOIN的語義以INNER JOIN為例,右邊有兩條相同的訂單流入,我們就應該向下遊輸出兩條JOIN結果,當有撤回的事件流入時候,我們也需要将已經下發下遊的JOIN事件撤回,如下:
上面的場景以及LEFT JOIN部分介紹的撤回情況,Apache Flink内部需要處理如下幾個核心點:
- 記錄重複記錄(完整記錄重複記錄或者記錄相同記錄的個數)
- 記錄正向記錄和撤回記錄(完整記錄正向和撤回記錄或者記錄個數)
- 記錄哪一條事件是第一個可以與左邊事件進行JOIN的事件
雙流JOIN的State資料結構
在Apache Flink内部對不同的場景有特殊的資料結構優化,本篇我們隻針對上面說的情況(通用設計)介紹一下雙流JOIN的State的資料結構和用途:
資料結構
- Map<JoinKey, Map<rowData, count>>;
- 第一級MAP的key是Join key,比如示例中的P001, value是流上面的所有完整事件;
- 第二級MAP的key是行資料,比如示例中的P001, 2,value是相同僚件值的個數
資料結構的利用
- 記錄重複記錄 - 利用第二級MAP的value記錄重複記錄的個數,這樣大大減少存儲和讀取
- 正向記錄和撤回記錄 - 利用第二級MAP的value記錄,當count=0時候删除該元素
- 判斷右邊是否産生撤回記錄 - 根據第一級MAP的value的size來判斷是否産生撤回,隻有size由0變成1的時候(第一條和左可以JOIN的事件)才産生撤回
雙流JOIN的應用優化
構造更新流
我們在 《Apache Flink 漫談系列 - 持續查詢(Continuous Queries)》篇中以雙流JOIN為例介紹了如何構造業務上的PK source,構造PK source本質上在保證業務語義的同時也是對雙流JOIN的一種優化,比如多級LEFT JOIN會讓流上的資料不斷膨脹,造成JOIN節點性能較慢,JOIN之後的下遊節點邊堵(資料量大導緻,非熱點)。那麼嫌少流入JOIN的資料,比如構造PK source就會大大減少JOIN資料的膨脹。這裡不再重複舉例,大家可以查閱 《Apache Flink 漫談系列 - 持續查詢(Continuous Queries)》 的雙流JOIN示例部分。
NULL造成的熱點
比如我們有A LEFT JOIN B ON A.aCol = B.bCol LEFT JOIN C ON B.cCol = C.cCol 的業務,JOB的DAG如下:
假設在實際業務中有這樣的特點,大部分時候當A事件流入的時候,B還沒有可以JOIN的資料,但是B來的時候,A已經有可以JOIN的資料了,這特點就會導緻,A LEFT JOIN B 會産生大量的 (A, NULL),其中包括B裡面的 cCol 列也是NULL,這時候當與C進行LEFT JOIN的時候,首先Blink内部會利用cCol對AB的JOIN産生的事件流進行Shuffle, cCol是NULL進而是下遊節點大量的NULL事件流入,造成熱點。那麼這問題如何解決呢?
我們可以改變JOIN的先後順序,來保證A LEFT JOIN B 不會産生NULL的熱點問題,如下:
JOIN ReOrder
對于JOIN算子的實作我們知道左右兩邊的事件都會存儲到State中,在流入事件時候在從另一邊讀取所有事件進行JOIN計算,這樣的實作邏輯在資料量很大的場景會有一定的state操作瓶頸,我們某些場景可以通過業務角度調整JOIN的順序,來消除性能瓶頸,比如:A JOIN B ON A.acol = B.bcol JOIN C ON B.bcol = C.ccol. 這樣的場景,如果 A與B進行JOIN産生資料量很大,但是B與C進行JOIN産生的資料量很小,那麼我們可以強制調整JOIN的聯接順序,B JOIN C ON b.bcol = c.ccol JOIN A ON a.acol = b.bcol. 如下示意圖:
小結