天天看點

一招教你資料倉庫如何高效批量導入與更新資料

摘要:GaussDB(DWS)支援的MERGE INTO功能,可以同時進行大資料量的更新與插入。對于資料倉庫是一項非常重要的技術。

本文分享自華為雲社群《一招教你如何高效批量導入與更新資料》,原文作者:acydy。

前言

如果有一張表,我們既想對它更新,又想對它插入應該如何操作? 可以使用UPDATE和INSERT完成你的目标。

如果你的資料量很大,想盡快完成任務執行,可否有其他方案?那一定不要錯過GaussDB(DWS)的MERGE INTO功能。

MERGE INTO 概念

MERGE INTO是SQL 2003引入的标準。

If a table T, as well as being updatable, is insertable-into, then rows can be inserted into it (subject to applicable Access Rules and Conformance Rules). The primary effect of an <insert statement> on T is to insert into T each of the zero or more rows contained in a specified table. The primary effect of a <merge statement> on T is to replace zero or more rows in T with specified rows and/or to insert into T zero or more specified rows, depending on the result of a <search condition> and on whether one or both of <merge when matched clause> and <merge when not matched clause> are specified.

一張表在一條語句裡面既可以被更新,也可以被插入。是否被更新還是插入取決于search condition的結果和指定的merge when matched clause(當condition比對時做什麼操作)和merge when not matched clause(當condition不比對時做什麼操作)文法。

SQL 2008進行了擴充,可以使用多個MATCHED 和NOT MATCHED 。

MERGE has been extended to support multiple MATCHED and NOT MATCHED clauses, each accompanied by a search condition, that gives much greater flexibility in the coding of complex MERGE statements to handle update conflicts.

MERGE INTO 指令涉及到兩張表。目标表:被插入或者更新的表。源表:用于跟目标表進行比對的表,目标表的資料來源。

MERGE INTO語句将目标表和源表中資料針對關聯條件進行比對,若關聯條件比對時對目标表進行UPDATE,無法比對時對目标表執行INSERT。

使用場景:當業務中需要将一個表中大量資料添加到現有表時,使用MERGE INTO 可以高效地将資料導入,避免多次INSERT+UPDATE操作。

MERGE INTO 文法

GaussDB(DWS) MERGE INTO 文法如下:

MERGE INTO table_name [ [ AS ] alias ]
USING { { table_name | view_name } | subquery } [ [ AS ] alias ]
ON ( condition )
[
  WHEN MATCHED THEN
  UPDATE SET { column_name = { expression | DEFAULT } |
          ( column_name [, ...] ) = ( { expression | DEFAULT } [, ...] ) } [, ...]
  [ WHERE condition ]
]
[
  WHEN NOT MATCHED THEN
  INSERT { DEFAULT VALUES |
  [ ( column_name [, ...] ) ] VALUES ( { expression | DEFAULT } [, ...] ) [, ...] [ WHERE condition ] }
];      
  • INTO 指定目标表。
  • USING 指定源表。源表可以是普通表,也可以是子查詢。
  • ON 關聯條件,用于指定目标表和源表的關聯條件。
  • WHEN MATCHED 當源表和目标表中資料可以比對關聯條件時,選擇WHEN MATCHED子句執行UPDATE操作。
  • WHEN NOT MATCHED 當源表和目标表中資料無法比對關聯條件時,選擇WHEN NOT MATCHED子句執行INSERT操作。
    • WHEN MATCHED,WHEN NOT MATCHED 可以預設一個,不能指定多個。
    • WHEN MATCHED,WHEN NOT MATCHED 可以使用WHERE進行條件過濾。
    • WHEN MATCHED,WHEN NOT MATCHED 順序可以交換。

實戰應用

首先建立好下面幾張表,用于執行MREGE INTO 操作。

gaussdb=# CREATE TABLE dst (
  product_id INT,
  product_name VARCHAR(20),
  category VARCHAR(20),
  total INT
) DISTRIBUTE BY HASH(product_id);

gaussdb=# CREATE TABLE dst_data (
  product_id INT,
  product_name VARCHAR(20),
  category VARCHAR(20),
  total INT
) DISTRIBUTE BY HASH(product_id);

gaussdb=# CREATE TABLE src (
  product_id INT,
  product_name VARCHAR(20),
  category VARCHAR(20),
  total INT
) DISTRIBUTE BY HASH(product_id);

gaussdb=# INSERT INTO dst_data VALUES(1601,'lamaze','toys',100),(1600,'play gym','toys',100),(1502,'olympus','electrncs',100),(1501,'vivitar','electrnc',100),(1666,'harry potter','dvd',100);
gaussdb=# INSERT INTO src VALUES(1700,'wait interface','books',200),(1666,'harry potter','toys',200),(1601,'lamaze','toys',200),(1502,'olympus camera','electrncs',200);
gaussdb=# INSERT INTO dst SELECT * FROM dst_data;      

同時指定WHEN MATCHED 與WHEN NOT MATCHED

  • 檢視計劃,看下MERGE INTO是如何執行的。

MERGE INTO轉化成JOIN将兩個表進行關聯處理,關聯條件就是ON後指定的條件。

gaussdb=# EXPLAIN (COSTS off)
MERGE INTO dst x
USING src y
ON x.product_id = y.product_id
WHEN MATCHED THEN
  UPDATE SET product_name = y.product_name, category = y.category, total = y.total
WHEN NOT MATCHED THEN
  INSERT VALUES (y.product_id, y.product_name, y.category, y.total);
 
                    QUERY PLAN
--------------------------------------------------
  id |                operation
-----+--------------------------------------------
   1 | ->  Streaming (type: GATHER)
   2 |    ->  Merge on dst x
   3 |       ->  Streaming(type: REDISTRIBUTE)
   4 |          ->  Hash Left Join (5, 6)
   5 |             ->  Seq Scan on src y
   6 |             ->  Hash
   7 |                ->  Seq Scan on dst x

  Predicate Information (identified by plan id)
 ------------------------------------------------
   4 --Hash Left Join (5, 6)
         Hash Cond: (y.product_id = x.product_id)
(14 rows)      

為什麼這裡轉化成了LEFT JOIN?

由于需要在目标表與源表比對時更新目标表,不比對時向目标表插入資料。也就是源表的一部分資料用于更新目标表,另一部分用于向目标表插入。與LEFT JOIN語義是相似的。

5 --Seq Scan on public.src y
         Output: y.product_id, y.product_name, y.category, y.total, y.ctid
         Distribute Key: y.product_id
   6 --Hash
         Output: x.product_id, x.product_name, x.category, x.total, x.ctid, x.xc_node_id
   7 --Seq Scan on public.dst x
         Output: x.product_id, x.product_name, x.category, x.total, x.ctid, x.xc_node_id
         Distribute Key: x.product_id      
  • 執行MERGE INTO,檢視結果。

兩張表在product_id是1502,1601,1666時可以關聯,是以這三條記錄被更新。src表product_id是1700時未比對,插入此條記錄。其他未修改。

gaussdb=# SELECT * FROM dst ORDER BY 1;
 product_id | product_name | category  | total
------------+--------------+-----------+-------
       1501 | vivitar      | electrnc  |   100
       1502 | olympus      | electrncs |   100
       1600 | play gym     | toys      |   100       
       1601 | lamaze       | toys      |   100
       1666 | harry potter | dvd       |   100      
(5 rows)

gaussdb=# SELECT * FROM src ORDER BY 1;
 product_id |  product_name  | category  | total
------------+----------------+-----------+-------
       1502 | olympus camera | electrncs |   200
       1601 | lamaze         | toys      |   200       
       1666 | harry potter   | toys      |   200
       1700 | wait interface | books     |   200       
(4 rows)

gaussdb=# MERGE INTO dst x
USING src y
ON x.product_id = y.product_id
WHEN MATCHED THEN
  UPDATE SET product_name = y.product_name, category = y.category, total = y.total
WHEN NOT MATCHED THEN
  INSERT VALUES (y.product_id, y.product_name, y.category, y.total);
MERGE 4
gaussdb=# SELECT * FROM dst ORDER BY 1;
 product_id |  product_name  | category  | total
------------+----------------+-----------+-------
       1501 | vivitar        | electrnc  |   100  -- 未修改
       1502 | olympus camera | electrncs |   200  -- 更新
       1600 | play gym       | toys      |   100  -- 未修改
       1601 | lamaze         | toys      |   200  -- 更新
       1666 | harry potter   | toys      |   200  -- 更新
       1700 | wait interface | books     |   200  -- 插入
(6 rows)      
  • 檢視具體UPDATE、INSERT個數

可以通過EXPLAIN PERFORMANCE或者EXPLAIN ANALYZE檢視UPDATE、INSERT各自個數。(這裡僅顯示必要部分)

在Predicate Information部分可以看到總共插入一條,更新三條。

在Datanode Information部分可以看到每個節點的資訊。datanode1上更新2條,datanode2上插入一條,更新1條。

gaussdb=# EXPLAIN PERFORMANCE
MERGE INTO dst x
USING src y
ON x.product_id = y.product_id
WHEN MATCHED THEN
  UPDATE SET product_name = y.product_name, category = y.category, total = y.total
WHEN NOT MATCHED THEN
  INSERT VALUES (y.product_id, y.product_name, y.category, y.total);
 
  Predicate Information (identified by plan id)
 ------------------------------------------------
   2 --Merge on public.dst x
         Merge Inserted: 1
         Merge Updated: 3
 
                      Datanode Information (identified by plan id)
 ---------------------------------------------------------------------------------------
   2 --Merge on public.dst x
         datanode1 (Tuple Inserted 0, Tuple Updated 2)
         datanode2 (Tuple Inserted 1, Tuple Updated 1)        

省略WHEN NOT MATCHED 部分。

  • 這裡由于沒有WHEN NOT MATCHED部分,在兩個表不比對時不需要執行任何操作,也就不需要源表這部分的資料,所有隻需要inner join即可。
gaussdb=# EXPLAIN (COSTS off)
MERGE INTO dst x
USING src y
ON x.product_id = y.product_id
WHEN MATCHED THEN
  UPDATE SET product_name = y.product_name, category = y.category, total = y.total;
                    QUERY PLAN
--------------------------------------------------
  id |             operation
 ----+-----------------------------------
   1 | ->  Streaming (type: GATHER)
   2 |    ->  Merge on dst x
   3 |       ->  Hash Join (4,5)
   4 |          ->  Seq Scan on dst x
   5 |          ->  Hash
   6 |             ->  Seq Scan on src y

  Predicate Information (identified by plan id)
 ------------------------------------------------
   3 --Hash Join (4,5)
         Hash Cond: (x.product_id = y.product_id)
(13 rows)      
  • 執行後檢視結果。MERGE INTO隻操作了3條資料。
gaussdb=# truncate dst;
gaussdb=# INSERT INTO dst SELECT * FROM dst_data;
gaussdb=# MERGE INTO dst x
USING src y
ON x.product_id = y.product_id
WHEN MATCHED THEN
  UPDATE SET product_name = y.product_name, category = y.category, total = y.total;
MERGE 3
gaussdb=# SELECT * FROM dst;
 product_id |  product_name  | category  | total
------------+----------------+-----------+-------
       1501 | vivitar        | electrnc  |   100  -- 未修改
       1502 | olympus camera | electrncs |   200  -- 更新
       1600 | play gym       | toys      |   100  -- 未修改
       1601 | lamaze         | toys      |   200  -- 更新
       1666 | harry potter   | toys      |   200  -- 更新
(5 rows)      

省略WHEN NOT MATCHED

  • 隻有在不比對時進行插入。結果中沒有資料被更新。
gaussdb=# EXPLAIN (COSTS off)
MERGE INTO dst x
USING src y
ON x.product_id = y.product_id
WHEN NOT MATCHED THEN
  INSERT VALUES (y.product_id, y.product_name, y.category, y.total);
                    QUERY PLAN
--------------------------------------------------
  id |                operation
 ----+-----------------------------------------
   1 | ->  Streaming (type: GATHER)
   2 |    ->  Merge on dst x
   3 |       ->  Streaming(type: REDISTRIBUTE)
   4 |          ->  Hash Left Join (5, 6)
   5 |             ->  Seq Scan on src y
   6 |             ->  Hash
   7 |                ->  Seq Scan on dst x

  Predicate Information (identified by plan id)
 ------------------------------------------------
   4 --Hash Left Join (5, 6)
         Hash Cond: (y.product_id = x.product_id)
(14 rows)

gaussdb=# truncate dst;
gaussdb=# INSERT INTO dst SELECT * FROM dst_data;
gaussdb=# MERGE INTO dst x
USING src y
ON x.product_id = y.product_id
WHEN NOT MATCHED THEN
  INSERT VALUES (y.product_id, y.product_name, y.category, y.total);
MERGE 1
gaussdb=# SELECT * FROM dst ORDER BY 1;
 product_id |  product_name  | category  | total
------------+----------------+-----------+-------
       1501 | vivitar        | electrnc  |   100  -- 未修改
       1502 | olympus        | electrncs |   100  -- 未修改
       1600 | play gym       | toys      |   100  -- 未修改
       1601 | lamaze         | toys      |   100  -- 未修改
       1666 | harry potter   | dvd       |   100  -- 未修改
       1700 | wait interface | books     |   200  -- 插入
(6 rows)      

WHERE過濾條件

語義是在進行更新或者插入前判斷目前行是否滿足過濾條件,如果不滿足,就不進行更新或者插入。如果對于字段不想被更新,需要指定過濾條件。

下面例子在兩表可關聯時,隻會更新product_name = 'olympus’的行。在兩表無法關聯時且源表的product_id != 1700時才會進行插入。

gaussdb=# truncate dst;
gaussdb=# INSERT INTO dst SELECT * FROM dst_data;
gaussdb=# MERGE INTO dst x
USING src y
ON x.product_id = y.product_id
WHEN MATCHED THEN
  UPDATE SET product_name = y.product_name, category = y.category, total = y.total
  WHERE x.product_name = 'olympus'
WHEN NOT MATCHED THEN
  INSERT VALUES (y.product_id, y.product_name, y.category, y.total) WHERE y.product_id != 1700;
MERGE 1
gaussdb=# SELECT * FROM dst ORDER BY 1;
SELECT * FROM dst ORDER BY 1;
 product_id |  product_name  | category  | total
------------+----------------+-----------+-------
       1501 | vivitar        | electrnc  |   100
       1502 | olympus camera | electrncs |   200
       1600 | play gym       | toys      |   100
       1601 | lamaze         | toys      |   100
       1666 | harry potter   | dvd       |   100
(5 rows)      

子查詢

在USING部分可以使用子查詢,進行更複雜的關聯操作。

  • 對源表進行聚合操作的結果再與目标表比對
MERGE INTO dst x
USING (
  SELECT product_id, product_name, category, sum(total) AS total FROM src group by product_id, product_name, category
) y
ON x.product_id = y.product_id
WHEN MATCHED THEN
    UPDATE SET product_name = x.product_name, category = x.category, total = x.total
WHEN NOT MATCHED THEN
    INSERT VALUES (y.product_id, y.product_name, y.category, y.total + 200);      
  • 多個表UNION後的結果再與目标表比對
MERGE INTO dst x
USING (
  SELECT 1501 AS product_id, 'vivitar 35mm' AS product_name, 'electrncs' AS category, 100 AS total UNION ALL
  SELECT 1666 AS product_id, 'harry potter' AS product_name, 'dvd' AS category, 100 AS total
) y
ON x.product_id = y.product_id
WHEN MATCHED THEN
    UPDATE SET product_name = x.product_name, category = x.category, total = x.total
WHEN NOT MATCHED THEN
    INSERT VALUES (y.product_id, y.product_name, y.category, y.total + 200);      

存儲過程

gaussdb=# CREATE OR REPLACE PROCEDURE store_procedure1()
AS
BEGIN
  MERGE INTO dst x
  USING src y
  ON x.product_id = y.product_id
  WHEN MATCHED THEN
    UPDATE SET product_name = y.product_name, category = y.category, total = y.total;
END;
/

CREATE PROCEDURE
gaussdb=# CALL store_procedure1();      

MERGE INTO背後原理

上文提到了MREGE INTO轉化成LEFT JOIN或者INNER JOIN将目标表和源表進行關聯。那麼如何知道某一行要進行更新還是插入?

通過EXPLAIN VERBOSE檢視算子的輸出。掃描兩張表時都輸出了ctid列。那麼ctid列有什麼作用呢?

5 --Seq Scan on public.src y
         Output: y.product_id, y.product_name, y.category, y.total, y.ctid
         Distribute Key: y.product_id
   6 --Hash
         Output: x.product_id, x.product_name, x.category, x.total, x.ctid, x.xc_node_id
   7 --Seq Scan on public.dst x
         Output: x.product_id, x.product_name, x.category, x.total, x.ctid, x.xc_node_id
         Distribute Key: x.product_id      

ctid辨別了這一行在存儲上具體位置,知道了這個位置就可以對這個位置的資料進行更新。GaussDB(DWS)作為MPP分布式資料庫,還需要知道節點的資訊(xc_node_id)。UPDATE操作需要這兩個值。

在MREGE INTO這裡ctid還另有妙用。當目标表比對時需要更新,這是就保留本行ctid值。如果無法比對,插入即可。就不需要ctid,此時可認識ctid值是NULL。根據LEFT JOIN輸出的ctid結果是否為NULL,最終決定本行該被更新還是插入。

這樣在兩張表做完JOIN操作後,根據JOIN後輸出的ctid列,更新或者插入某一行。

注意事項

使用MERGE INTO時要注意比對條件是否合适。如果不注意,容易造成資料被非預期更新,可能整張表被更新。

總結

GAUSSDB(DWS)提供了高效的資料導入的功能MERGE INTO,對于資料倉庫是一項非常關鍵的功能。可以使用MERGE INTO 同時更新和插入一張表,在資料量非常大的情況下也能很快完成地資料導入。

想了解GuassDB(DWS)更多資訊,歡迎微信搜尋“GaussDB DWS”關注微信公衆号,和您分享最新最全的PB級數倉黑科技,背景還可擷取衆多學習資料哦~

點選關注,第一時間了解華為雲新鮮技術~