天天看點

PostgreSQL sharding extensino citus 優化器 Query Processing 之 - Subquery/CTE Push-Pull Execution

标簽

PostgreSQL , citus , sharding , push , pull , 優化器

https://github.com/digoal/blog/blob/master/201903/20190316_01.md#%E8%83%8C%E6%99%AF 背景

citus 是postgresql的sharding 開源中間件,2018年被微軟收購,插件依舊開源。

在處理非常複雜的SQL時,CITUS使用推拉模型,支援跨節點的資料交換,用以處理複雜SQL。

中間結果的push,pull過程:

push : shard -> coordinator

pull : coordinator -> worker(同一個worker包含多個shard, shard共享FILE pull from coordinator)

shard, coordinator, worker使用COPY互動,使用FILE儲存中間結果,node executor使用intermediate_result通路FILE(中間結果)。

https://github.com/digoal/blog/blob/master/201903/20190316_01.md#%E4%BE%8B%E5%AD%90 例子

If necessary Citus can gather results from subqueries and CTEs into the coordinator node and then push them back across workers for use by an outer query. This allows Citus to support a greater variety of SQL constructs, and even mix executor types between a query and its subqueries.

For example, having subqueries in a WHERE clause sometimes cannot execute inline at the same time as the main query, but must be done separately. Suppose a web analytics application maintains a visits table partitioned by page_id. To query the number of visitor sessions on the top twenty most visited pages, we can use a subquery to find the list of pages, then an outer query to count the sessions.

以下請求,subquery中的請求需要獨立執行,将結果推到外層。

SELECT page_id, count(distinct session_id)  
FROM visits  
WHERE page_id IN (  
  SELECT page_id  
  FROM visits  
  GROUP BY page_id  
  ORDER BY count(*) DESC  
  LIMIT 20  
)  
GROUP BY page_id;  
           

The real-time executor would like to run a fragment of this query against each shard by page_id, counting distinct session_ids, and combining the results on the coordinator. However the LIMIT in the subquery means the subquery cannot be executed as part of the fragment. By recursively planning the query Citus can run the subquery separately, push the results to all workers, run the main fragment query, and pull the results back to the coordinator. The “push-pull” design supports a subqueries like the one above.

Let’s see this in action by reviewing the EXPLAIN output for this query. It’s fairly involved:

完整執行計劃:

GroupAggregate  (cost=0.00..0.00 rows=0 width=0)  
  Group Key: remote_scan.page_id  
  ->  Sort  (cost=0.00..0.00 rows=0 width=0)  
    Sort Key: remote_scan.page_id  
    ->  Custom Scan (Citus Real-Time)  (cost=0.00..0.00 rows=0 width=0)  
      ->  Distributed Subplan 6_1  
        ->  Limit  (cost=0.00..0.00 rows=0 width=0)  
          ->  Sort  (cost=0.00..0.00 rows=0 width=0)  
            Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC  
            ->  HashAggregate  (cost=0.00..0.00 rows=0 width=0)  
              Group Key: remote_scan.page_id  
              ->  Custom Scan (Citus Real-Time)  (cost=0.00..0.00 rows=0 width=0)  
                Task Count: 32  
                Tasks Shown: One of 32  
                ->  Task  
                  Node: host=localhost port=5433 dbname=postgres  
                  ->  Limit  (cost=1883.00..1883.05 rows=20 width=12)  
                    ->  Sort  (cost=1883.00..1965.54 rows=33017 width=12)  
                      Sort Key: (count(*)) DESC  
                      ->  HashAggregate  (cost=674.25..1004.42 rows=33017 width=12)  
                        Group Key: page_id  
                        ->  Seq Scan on visits_102264 visits  (cost=0.00..509.17 rows=33017 width=4)  
      Task Count: 32  
      Tasks Shown: One of 32  
      ->  Task  
        Node: host=localhost port=5433 dbname=postgres  
        ->  HashAggregate  (cost=734.53..899.61 rows=16508 width=8)  
          Group Key: visits.page_id, visits.session_id  
          ->  Hash Join  (cost=17.00..651.99 rows=16508 width=8)  
            Hash Cond: (visits.page_id = intermediate_result.page_id)  
            ->  Seq Scan on visits_102264 visits  (cost=0.00..509.17 rows=33017 width=8)  
            ->  Hash  (cost=14.50..14.50 rows=200 width=4)  
              ->  HashAggregate  (cost=12.50..14.50 rows=200 width=4)  
                Group Key: intermediate_result.page_id  
                ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..10.00 rows=1000 width=4)  
           

Let’s break it apart and examine each piece.

GroupAggregate  (cost=0.00..0.00 rows=0 width=0)  
  Group Key: remote_scan.page_id  
  ->  Sort  (cost=0.00..0.00 rows=0 width=0)  
    Sort Key: remote_scan.page_id  
           

以上,根節點,最外層

GROUP BY page_id;

,采用的是group agg,而非hash agg.

The root of the tree is what the coordinator node does with the results from the workers. In this case it is grouping them, and GroupAggregate requires they be sorted first.

->  Custom Scan (Citus Real-Time)  (cost=0.00..0.00 rows=0 width=0)  
      ->  Distributed Subplan 6_1  
.  
           

以上,由于有subquery的存在,是以整個SQL拆成兩個獨立部分執行,第一個部分為subquery。

The custom scan has two large sub-trees, starting with a “distributed subplan.”

->  Limit  (cost=0.00..0.00 rows=0 width=0)  
          ->  Sort  (cost=0.00..0.00 rows=0 width=0)  
            Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC  
            ->  HashAggregate  (cost=0.00..0.00 rows=0 width=0)  
              Group Key: remote_scan.page_id  
              ->  Custom Scan (Citus Real-Time)  (cost=0.00..0.00 rows=0 width=0)  
                Task Count: 32  
                Tasks Shown: One of 32  
                ->  Task  
                  Node: host=localhost port=5433 dbname=postgres  
                  ->  Limit  (cost=1883.00..1883.05 rows=20 width=12)  
                    ->  Sort  (cost=1883.00..1965.54 rows=33017 width=12)  
                      Sort Key: (count(*)) DESC  
                      ->  HashAggregate  (cost=674.25..1004.42 rows=33017 width=12)  
                        Group Key: page_id  
                        ->  Seq Scan on visits_102264 visits  (cost=0.00..509.17 rows=33017 width=4)  
.  
           

以上,SUBQUERY的執行計劃。每個SHARD執行count group,彙總到coordinator使用sum group得到最終的count group,LIMIT 20。 第一個獨立執行過程産生的中間結果為intermediate results

Worker nodes run the above for each of the thirty-two shards (Citus is choosing one representative for display). We can recognize all the pieces of the IN (…) subquery: the sorting, grouping and limiting. When all workers have completed this query, they send their output back to the coordinator which puts it together as “intermediate results.”

Task Count: 32  
      Tasks Shown: One of 32  
      ->  Task  
        Node: host=localhost port=5433 dbname=postgres  
        ->  HashAggregate  (cost=734.53..899.61 rows=16508 width=8)  
          Group Key: visits.page_id, visits.session_id  
          ->  Hash Join  (cost=17.00..651.99 rows=16508 width=8)  
            Hash Cond: (visits.page_id = intermediate_result.page_id)  
.  
           

以上,整個QUERY的為第二個獨立執行部分。coordinator 将intermediate results 通過PG COPY協定發送到worker (存儲為FILE),intermediate_result 函數從FILE中加載中間件結果,用于JOIN。

Citus starts another real-time job in this second subtree. It’s going to count distinct sessions in visits. It uses a JOIN to connect with the intermediate results. The intermediate results will help it restrict to the top twenty pages.

->  Seq Scan on visits_102264 visits  (cost=0.00..509.17 rows=33017 width=8)  
            ->  Hash  (cost=14.50..14.50 rows=200 width=4)  
              ->  HashAggregate  (cost=12.50..14.50 rows=200 width=4)  
                Group Key: intermediate_result.page_id  
                ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..10.00 rows=1000 width=4)  
.  
           

以上,整個QUERY的為第二個獨立執行部分。發生在shard上的hash join。

The worker internally retrieves intermediate results using a read_intermediate_result function which loads data from a file that was copied in from the coordinator node.

This example showed how Citus executed the query in multiple steps with a distributed subplan, and how you can use EXPLAIN to learn about distributed query execution.

https://github.com/digoal/blog/blob/master/201903/20190316_01.md#%E5%B0%8F%E7%BB%93 小結

當一個複雜SQL無法在一個worker executor process 中獨立完成時,citus 使用push-pull的方式,實作SQL的多階段執行,以支援更多複雜SQL。

例如本文提到的subquery,實際上就是一個獨立的execute過程,這個subquery獨立執行,中間結果使用PG COPY協定發送到coordinator。作為intermediate results。進入下一個獨立執行過程後,intermediate results将從coordinator發送到worker節點(同樣使用PG COPY協定),在worker節點中存儲為臨時FILE,在下一個獨立執行過程中,read_intermediate_result這個函數來讀取這些FILE并使用它們(即PG的Function Scan)。

https://github.com/digoal/blog/blob/master/201903/20190316_01.md#%E5%8F%82%E8%80%83 參考

http://docs.citusdata.com/en/v8.1/develop/reference_processing.html#citus-query-processing 《PostgreSQL sharding : citus 系列7 - topn 加速(

count(*) group by order by count(*) desc limit x

) (use 估值插件 topn)》
《PostgreSQL sharding : citus 系列6 - count(distinct xx) 加速 (use 估值插件 hll|hyperloglog)》 《PostgreSQL sharding : citus 系列5 - worker節點網絡優化》 《PostgreSQL sharding : citus 系列4 - DDL 操作規範 (新增DB,TABLE,SCHEMA,UDF,OP,使用者等)》 《PostgreSQL 11 相似圖像搜尋插件 imgsmlr 性能測試與優化 3 - citus 8機128shard (4億圖像)》 《Deepgreen(Greenplum) 多機部署測試 , TPC-H VS citus》 《PostgreSQL sharding : citus 系列3 - 視窗函數調用限制 與 破解之法(套用gpdb執行樹,分步執行)》 《PostgreSQL sharding : citus 系列2 - TPC-H》 《PostgreSQL citus, Greenplum 分布式執行計劃 DEBUG》 《PostgreSQL sharding : citus 系列1 - 多機部署(含OLTP(TPC-B)測試)》

https://github.com/digoal/blog/blob/master/201903/20190316_01.md#%E5%85%8D%E8%B4%B9%E9%A2%86%E5%8F%96%E9%98%BF%E9%87%8C%E4%BA%91rds-postgresql%E5%AE%9E%E4%BE%8Becs%E8%99%9A%E6%8B%9F%E6%9C%BA 免費領取阿裡雲RDS PostgreSQL執行個體、ECS虛拟機

PostgreSQL sharding extensino citus 優化器 Query Processing 之 - Subquery/CTE Push-Pull Execution