天天看點

Citus7.4-Citus 9.3新特性Citus7.4-Citus 9.3新特性

Citus7.4-Citus 9.3新特性

最近開始着手Citus7.4到Citus 9.3的更新,是以比較全面地浏覽了這期間的Citus變更。

從Citus7.4到Citus 9.3很多方面的改進,本文隻列出一些比較重要的部分。

以下用到了一些示例,示例的驗證環境如下

軟體

  • PostreSQL 12
  • Citus 9.3

叢集成員

  • CN
    • 127.0.0.1:9000
  • Worker
    • 127.0.0.1:9001
    • 127.0.0.1:9002

SQL支援增強類

1.支援非分區列的count distinct

這個Citus 7.4應該已經支援了,不知道是Citus的Changelog更新延誤,還是Citus 7.5支援得更完善了。

表定義

create table tb1(id int,c1 int);
select create_distributed_table('tb1','id');           

非分區列的count distinct的執行計劃

postgres=# explain select count(distinct c1) from tb1;
                                        QUERY PLAN                                        
------------------------------------------------------------------------------------------
 Aggregate  (cost=250.00..250.01 rows=1 width=8)
   ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=4)
         Task Count: 32
         Tasks Shown: One of 32
         ->  Task
               Node: host=127.0.0.1 port=9001 dbname=postgres
               ->  HashAggregate  (cost=38.25..40.25 rows=200 width=4)
                     Group Key: c1
                     ->  Seq Scan on tb1_102339 tb1  (cost=0.00..32.60 rows=2260 width=4)
(9 rows)           

2.支援UPSERT

支援UPSERT,即支援

INSERT INTO SELECT..ON CONFLICT/RETURNING

create table tb1(id int, c1 int);
select create_distributed_table('tb1','id');
create table tb2(id int primary key, c1 int);
select create_distributed_table('tb2','id');           

UPSERT SQL執行

postgres=# INSERT INTO tb2
  SELECT * from tb1
  ON CONFLICT(id) DO UPDATE SET c1 = EXCLUDED.c1;
INSERT 0 1           

3.支援GENERATED ALWAYS AS STORED

使用示例如下:

create table tbgenstore(id int, c1 int GENERATE ALWAYS AS (id+1)STORED);
select create_distributed_table('tbgenstore','id');           

4.支援使用者定義的分布式函數

支援使用者自定義分布式函數。Citus會把分布式函數(包括聚合函數)以及依賴的對象定義下發到所有Worker上。

後續在執行SQL的時候也可以合理的把分布式函數的執行下推到Worker。

分布式函數還可以和某個分布表綁定"親和"關系,這一個特性的使用場景如下:

在多租戶類型的業務中,把單個租戶的一個事務中的多個SQL打包成一個“分布式函數”下發到Worker上。

CN隻需要下推一次分布式函數的調用,分布式函數内部的多個SQL的執行全部在Worker節點内部完成。

避免CN和Worker之間來回互動,可以大大提升OLTP的性能(利用這個特性去跑TPCC,簡直太溜了!)。

下面看下手冊裡的例子。

https://docs.citusdata.com/en/v9.3/develop/api_udf.html?highlight=distributed%20function#create-distributed-function
-- an example function which updates a hypothetical
-- event_responses table which itself is distributed by event_id
CREATE OR REPLACE FUNCTION
  register_for_event(p_event_id int, p_user_id int)
RETURNS void LANGUAGE plpgsql AS $fn$
BEGIN
  INSERT INTO event_responses VALUES ($1, $2, 'yes')
  ON CONFLICT (event_id, user_id)
  DO UPDATE SET response = EXCLUDED.response;
END;
$fn$;

-- distribute the function to workers, using the p_event_id argument
-- to determine which shard each invocation affects, and explicitly
-- colocating with event_responses which the function updates
SELECT create_distributed_function(
  'register_for_event(int, int)', 'p_event_id',
  colocate_with := 'event_responses'
);           

5.完全支援聚合函數

Citus中對聚合函數有3種不同的執行方式

  1. 按照分片字段分組的聚合,直接下推到Worker執行聚合
  2. 對部分Citus能夠識别的聚合函數,Citus執行兩階段聚合,現在Worker執行部分聚合,再把結果彙總到CN上進行最終聚合。
  3. 對其他的聚合函數,Citus把資料拉到CN上,在CN上執行聚合。

詳細參考,

https://docs.citusdata.com/en/v9.3/develop/reference_sql.html?highlight=Aggregation#aggregate-functions

顯然第3種方式性能會比較差,對不按分片字段分組的聚合,怎麼讓它按第2種方式執行呢?

Citus中預定義了一部分聚合函數可以按第2中方式執行。

citus-9.3.0/src/include/distributed/multi_logical_optimizer.h

:

static const char *const AggregateNames[] = {
    "invalid", "avg", "min", "max",
    "sum", "count", "array_agg",
    "jsonb_agg", "jsonb_object_agg",
    "json_agg", "json_object_agg",
    "bit_and", "bit_or", "bool_and", "bool_or", "every",
    "hll_add_agg", "hll_union_agg",
    "topn_add_agg", "topn_union_agg",
    "any_value"
};           

對不在上面白名單的聚合函數,比如使用者自定義的聚合函數,可以通過

create_distributed_function()

添加。

示例如下:

citus-9.3.0/src/test/regress/expected/aggregate_support.out

create function sum2_sfunc(state int, x int)
returns int immutable language plpgsql as $$
begin return state + x;
end;
$$;
create function sum2_finalfunc(state int)
returns int immutable language plpgsql as $$
begin return state * 2;
end;
$$;
create aggregate sum2 (int) (
    sfunc = sum2_sfunc,
    stype = int,
    finalfunc = sum2_finalfunc,
    combinefunc = sum2_sfunc,
    initcond = '0'
);

select create_distributed_function('sum2(int)');           

執行這個自定義的聚合函數的執行計劃如下

postgres=# explain select sum2(c1) from tb1;
                                        QUERY PLAN                                        
------------------------------------------------------------------------------------------
 Aggregate  (cost=250.00..250.01 rows=1 width=4)
   ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=32)
         Task Count: 32
         Tasks Shown: One of 32
         ->  Task
               Node: host=127.0.0.1 port=9001 dbname=postgres
               ->  Aggregate  (cost=38.25..38.26 rows=1 width=32)
                     ->  Seq Scan on tb1_102339 tb1  (cost=0.00..32.60 rows=2260 width=4)
(8 rows)           

但是目前這種方式不支援

stype = internal

的自定義聚合函數。

Citus社群已經在對應這個問題,詳細參考

https://github.com/citusdata/citus/issues/3916

6.完全支援視窗函數

對不按分片字段分組的聚合函數,Citus支援把資料拉到CN上再執行,和聚合函數類型。

需要注意這種執行方式對性能的影響,特别是包含多個不同分組字段的視窗函數時,

Worker拉到CN上結果集是這些字段組合的笛卡爾積。

7.支援在事務塊中傳播LOCAL參數

當在CN的事務塊中設定LOCAL參數時,可以把這個參數傳播到Worker節點。

前提條件是

citus.propagate_set_commands

參數必須為local

set citus.propagate_set_commands TO local;           

事務塊中設定LOCAL參數

postgres=# begin;
BEGIN
postgres=*# set local enable_hashagg to off;
SET
postgres=*# SELECT current_setting('enable_hashagg') FROM tb1 WHERE id = 3;
 current_setting 
-----------------
 off
(1 row)           

8. 支援本地表和參考表Join

如果一個資料庫需要用到本地表,而本地表和以參考表的形式部署的維表又有Join的需求,改如何處理?

原來我們隻能在CN上再建立一套本地的維表,然後由應用或者通過觸發器維護兩套維表之間的資料同步。

現在可以用更簡單的方式實作。

具體就是把CN節點也可以作為一個Worker加到Citus叢集裡,groupid一定要設定為0。

SELECT master_add_node('127.0.0.1', 9001, groupid => 0);           

這樣CN上也就和其他Worker一樣擁有了參考表的一個副本,本地表和參考表Join的時候就直接在本地執行了。

DDL支援增強

9.支援把SCHEMA的賦權廣播到Worker上

GRANT USAGE ON SCHEMA dist_schema TO role1;           

10.支援修改表SCHEMA廣播到Worker上

ALTER TABLE ... SET SCHEMA           

11.支援建立索引時指定INCLUDE選項

create index tb1_idx_id on tb1(id) include (c1);           

12. 支援使用

CONCURRENTLY

選項建立索引

create index CONCURRENTLY tb1_idx_id2 on tb1(id);           

13. 支援傳播REINDEX到worker節點上

之前版本reindex不能傳播到Worker節點,還需要到每個worker分别執行reindex。

新版的Citus支援了。

reindex index tb1_idx_id;           

Citus MX功能增強

14.支援在MX 節點上對參考表執行DML

create table tbref(id int, c1 int);
select create_refence_table('tbref');           

在MX worker(即擴充worker)上修改參考表

postgres=# insert into tbref values(1,1),(2,2);
INSERT 0 2
postgres=# update tbref set c1=10;
UPDATE 2
postgres=# delete from tbref where id=1;
DELETE 1
postgres=# select * from tbref;
 id | c1 
----+----
  2 | 10
(1 row)           

15.支援在MX節點上執行TRUNCATE

之前MX節點上是不支援對分布表和參考表執行truncate操作的。現在也支援了

postgres=# truncate tb1;
TRUNCATE TABLE
postgres=# truncate tbref;
TRUNCATE TABLE           

16.支援在Citus MX架構下使用serial和smallserial

之前在Citus MX(即多CN部署)環境下,自增序列隻能使用bigserial類型,現在也可以支援serial和smallserial了。

create table tbserial(id int,c1 int);
select create_distributed_table('tbserial','id');           

Citus中,自增字段通過CN和MX節點上邏輯表上的序列對象實作。

postgres=# \d tbserial
                            Table "public.tbserial"
 Column |  Type   | Collation | Nullable |               Default                
--------+---------+-----------+----------+--------------------------------------
 id     | integer |           | not null | nextval('tbserial_id_seq'::regclass)
 c1     | integer |           |          |            

為了防止多個MX節點産生的序列沖突。在Citus MX環境下,序列值的開頭部分是産生序列的節點的groupid,後面才是順序累加的值。

這等于按groupid把序列值分成了不同的範圍,互不重疊。

即:

全局序列值 = groupid,節點内的順序遞增值           

對不同serial的資料類型,groupid占的位數是不一樣的。具體如下

  • bigserial:16bit
  • serial:4bit
  • smallserial:4bit

根據上groupid占的長度,我們需要注意

  1. 單個節點(CN或擴充Worker)上,能産生的序列值的數量變少了,要防止溢出。
  2. 如果使用了serial或smallserial,最多部署7個擴充Worker節點。

序列對象的定義

上面提到的全局序列的實作具體展現為:在不同節點上,序列對象定義的範圍不一樣。如下

CN節點上的序列對象定義(CN節點的groupid固定為0)

postgres=# \d tbserial_id_seq
                  Sequence "public.tbserial_id_seq"
  Type   | Start | Minimum |  Maximum   | Increment | Cycles? | Cache 
---------+-------+---------+------------+-----------+---------+-------
 integer |     1 |       1 | 2147483647 |         1 | no      |     1
Owned by: public.tbserial.id           

MX Worker節點上的序列對象定義(groupid=1)

postgres=# \d tbserial_id_seq
                    Sequence "public.tbserial_id_seq"
  Type  |   Start   |  Minimum  |  Maximum  | Increment | Cycles? | Cache 
--------+-----------+-----------+-----------+-----------+---------+-------
 bigint | 268435457 | 268435457 | 536870913 |         1 | no      |     1           

如何知道每個Worker節點的groupid?

每個Worker節點的groupid可以從

pg_dist_node

擷取。

postgres=# select * from pg_dist_node;
 nodeid | groupid | nodename  | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards 
--------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------
      2 |       2 | 127.0.0.1 |     9002 | default  | t           | t        | primary  | default     | t              | t
      1 |       1 | 127.0.0.1 |     9001 | default  | t           | t        | primary  | default     | t              | t
(2 rows)           

也可以在每個節點本地查詢

pg_dist_local_group

獲得本節點的groupid。

postgres=# select * from pg_dist_local_group;
 groupid 
---------
       1
(1 row)           

CN節點和普通的Worker節點(非MX Worker)的

pg_dist_local_group

中查詢到的groupid都為0.

17.在Citus MX通過本地執行提升性能

之前測試Citus MX架構的時候發現,當Citus MX節點上放分片時,性能比不放分片差一倍。

新版的Citus在這方面做了優化,當在Citus MX節點上通路本節點上的分片時,不再走建立一個到本地的資料庫連接配接再讀寫分片的正常執行方式。

而是直接用目前連接配接通路分片。根據下面的測試資料,性能可以提升一倍。

https://github.com/citusdata/citus/pull/2938
- Test 1: HammerDB test with 250 users, 1,000,000 transactions per. 8 Node Citus MX 
          - (a) With local execution: `System achieved 116473 PostgreSQL TPM at 160355 NOPM`   
          - (b) without local execution: ` System achieved 61392 PostgreSQL TPM at 100503 NOPM`

  - Test 2: HammerDB test with 250 users, 10,000,000 transactions per. 8 Node Citus MX 
           - (a) With local execution: `System achieved 91921 PostgreSQL TPM at 174557 NOPM`   
           - (b) without local execution: ` System achieved 84186 PostgreSQL TPM at 98408 NOPM`

- Test 3: Pgbench, 1 worker node, -c64 -c256 -T 120
            - (a) Local execution enabled (tps): `select-only`: 56202   `simple-update`:  11771 `tpcb-like`: 7796
            - (a) Local execution disabled (tps): `select-only`:  24524 `simple-update`: 5077  `tpcb-like`:   3510 (some connection errors for tpcb-like)           

在我司的多CN部署方式下,擴充Worker上是不放分片的。是以這個優化和我們無關。

性能增強

18.替換

real-time

為新的執行器

Adaptive Executor

Adaptive Executor

是一個新的執行器,它和

real-time

的差異主要展現在可以通過參數對CN到worker的連接配接數進行控制。具體如下:

  1. citus.max_shared_pool_size

    可以通過`citus.max_shared_pool_size`控制CN(或MX Worker)在單個Worker上可同時建立的最大連接配接數,預設值等于CN的`max_connections`。
    達到連接配接數使用上限後,新的SQL請求可能等待,有些操作不受限制,比如COPY和重分區的Join。
    Citus MX架構下,單個Worker上同時接受到連接配接數最大可能是 `max_shared_pool_size * (1 + MX Worker節點數)`           
  2. citus.max_adaptive_executor_pool_size

    可以通過`citus.max_adaptive_executor_pool_size`控制CN(或MX Worker)上的單個會話在單個Worker上可同時建立的最大連接配接數,預設值等于16。           
  3. citus.max_cached_conns_per_worker

    可以通過`citus.max_cached_conns_per_worker`控制CN(或MX Worker)上的單個會話在事務結束後對每個Worker緩存的連接配接數,預設值等于1。           
  4. citus.executor_slow_start_interval

    對于執行時間很短的多shard的SQL,并發開多個連接配接,不僅頻繁建立銷毀連接配接的消耗很高,也極大的消耗了worker上有限的連接配接資源。
    adaptive執行器,在執行多shard的SQL時,不是一次就建立出所有需要的連接配接數,而是先建立一部分,隔一段時間再建立一部分。
    中途如果有shard的任務提前完成了,它的連接配接可以被複用,就可以減少對建立連接配接的需求。
    是以執行多shard的SQL最少隻需要一個連接配接,最多不超過`max_adaptive_executor_pool_size`,當然也不會超過目标worker上的shard數。
    
    這個算法叫"慢啟動",慢啟動的間隔由參數`citus.executor_slow_start_interval`控制,預設值為10ms。
    初始建立的連接配接數是:max(1,`citus.max_cached_conns_per_worker`),之後每批建立的連接配接數都在前一批的基礎上加1。
    即預設情況下,每批建立的連接配接數依次為1,2,3,4,5,6...
    
    "慢啟動"主要優化了短查詢,對長查詢(手冊上給的标準是大于500ms),會增加一定的響應時間。
    
               

下面看幾個例子

citus.max_shared_pool_size

的使用示例

postgres=# alter system set citus.max_shared_pool_size to 4;
ALTER SYSTEM
postgres=# select pg_reload_conf();
 pg_reload_conf 
----------------
 t
(1 row)
postgres=# begin;
BEGIN
postgres=*# update tb1 set c1=11;
UPDATE 1
postgres=*# select * from citus_remote_connection_stats();
 hostname  | port | database_name | connection_count_to_node 
-----------+------+---------------+--------------------------
 127.0.0.1 | 9002 | postgres      |                        4
 127.0.0.1 | 9001 | postgres      |                        4
(2 rows)           

citus.executor_slow_start_interval

tb1總共有32個分片,每個worker上有16個分片。

初始每個worker上保持2個連接配接

postgres=# select * from citus_remote_connection_stats();
 hostname  | port | database_name | connection_count_to_node 
-----------+------+---------------+--------------------------
 127.0.0.1 | 9002 | postgres      |                        2
 127.0.0.1 | 9001 | postgres      |                        2
(2 rows)           

citus.executor_slow_start_interval = '10ms'

時,執行一個空表的update,隻額外建立了2個新連接配接。

postgres=# set citus.executor_slow_start_interval='10ms';
SET
postgres=# begin;
BEGIN
postgres=*# update tb1 set c1=100;
UPDATE 0
postgres=*# select * from citus_remote_connection_stats();
 hostname  | port | database_name | connection_count_to_node 
-----------+------+---------------+--------------------------
 127.0.0.1 | 9002 | postgres      |                        4
 127.0.0.1 | 9001 | postgres      |                        4
(2 rows)           

citus.executor_slow_start_interval = '500ms'

時,沒有建立新的連接配接,都複用了一個緩存的連接配接

postgres=# set citus.executor_slow_start_interval='500ms';
SET
postgres=# begin;
BEGIN
postgres=*# update tb1 set c1=100;
UPDATE 0
postgres=*# select * from citus_remote_connection_stats();
 hostname  | port | database_name | connection_count_to_node 
-----------+------+---------------+--------------------------
 127.0.0.1 | 9002 | postgres      |                        2
 127.0.0.1 | 9001 | postgres      |                        2
(2 rows)           

citus.executor_slow_start_interval = '0ms'

時,建立了比較多的新連接配接。

postgres=# set citus.executor_slow_start_interval = '0ms';
SET
postgres=# begin;
BEGIN
postgres=*# update tb1 set c1=100;
UPDATE 0
postgres=*# select * from citus_remote_connection_stats();
 hostname  | port | database_name | connection_count_to_node 
-----------+------+---------------+--------------------------
 127.0.0.1 | 9002 | postgres      |                        5
 127.0.0.1 | 9001 | postgres      |                       14
(2 rows)           

參考

adaptive執行器連接配接建立"慢啟動"的代碼參考:

citus-9.3.0/src/backend/distributed/executor/adaptive_executor.c:

static void
ManageWorkerPool(WorkerPool *workerPool)
{
...
        /* cannot open more than targetPoolSize connections */
        int maxNewConnectionCount = targetPoolSize - initiatedConnectionCount;//targetPoolSize的值為max(1,`citus.max_cached_conns_per_worker`)

        /* total number of connections that are (almost) available for tasks */
        int usableConnectionCount = UsableConnectionCount(workerPool);

        /*
         * Number of additional connections we would need to run all ready tasks in
         * parallel.
         */
        int newConnectionsForReadyTasks = readyTaskCount - usableConnectionCount;

        /*
         * Open enough connections to handle all tasks that are ready, but no more
         * than the target pool size.
         */
        newConnectionCount = Min(newConnectionsForReadyTasks, maxNewConnectionCount);

        if (newConnectionCount > 0 && ExecutorSlowStartInterval != SLOW_START_DISABLED)
        {
            if (MillisecondsPassedSince(workerPool->lastConnectionOpenTime) >=
                ExecutorSlowStartInterval)
            {
                newConnectionCount = Min(newConnectionCount,
                                         workerPool->maxNewConnectionsPerCycle);

                /* increase the open rate every cycle (like TCP slow start) */
                workerPool->maxNewConnectionsPerCycle += 1;
            }
            else
            {
                /* wait a bit until opening more connections */
                return;
            }
        }           

19.通過adaptive執行器執行重分布的Join

citus.enable_repartition_joins=on

時,Citus支援通過資料重分布的方式執行非親和Inner Join,

之前版本Citus會自動切換到

task-tracker

執行器執行重分布的Join,但是使用

task-tracker

執行器需要CN節點給Worker下發任務再不斷檢查任務完成狀态,其額外消耗很大,響應時間非常長。

新版Citus改進後,可以通過adaptive執行器執行重分布的Join。

根據官網部落格,1000w以下資料的重分布Join,性能提升了10倍。

詳細參考:

https://www.citusdata.com/blog/2020/03/02/citus-9-2-speeds-up-large-scale-htap/

我們自己的簡單測試中,2張空表的重分布Join,之前需要16秒,現在隻需要2秒。

20.支援重分布的方式執行

INSERT...SELECT

create table tb1(id int, c1 int);
select create_distributed_table('tb1','id');
set citus.shard_count to 16;
create table tb2(id int primary key, c1 int);
select create_distributed_table('tb2','id');           

tb1和tb2的分片數不一樣,即它們不是親和的。

此前,Citus必須把資料全拉到CN節點上中轉。

新版Citus可以通過重分布的方式執行這個SQL,各個Worker之間直接互相傳送資料,CN節點隻執行工具函數驅動任務執行,性能可大幅提升。

postgres=# explain INSERT INTO tb2
  SELECT * from tb1
  ON CONFLICT(id) DO UPDATE SET c1 = EXCLUDED.c1;
                                     QUERY PLAN                                     
------------------------------------------------------------------------------------
 Custom Scan (Citus INSERT ... SELECT)  (cost=0.00..0.00 rows=0 width=0)
   INSERT/SELECT method: repartition
   ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=100000 width=8)
         Task Count: 32
         Tasks Shown: One of 32
         ->  Task
               Node: host=127.0.0.1 port=9001 dbname=postgres
               ->  Seq Scan on tb1_102339 tb1  (cost=0.00..32.60 rows=2260 width=8)
(8 rows)           

根據官網部落格,這項優化使性能提升了5倍。

需要注意的是,如果插入時,需要在目标表上自動生成自增字段,Citus會退回到原來的執行方式,資料都會經過CN中轉一下。

21.支援以輪詢的方式通路參考表的多個副本

之前Citus查詢參考表時,始終隻通路參考表的第一個副本,新版Citus可以通過參數設定,在參考表多個副本輪詢通路,均衡負載。

postgres=# set citus.task_assignment_policy TO "round-robin";
SET
postgres=# explain select * from tbref;
                                    QUERY PLAN                                    
----------------------------------------------------------------------------------
 Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
   Task Count: 1
   Tasks Shown: All
   ->  Task
         Node: host=127.0.0.1 port=9001 dbname=postgres
         ->  Seq Scan on tbref_102371 tbref  (cost=0.00..32.60 rows=2260 width=8)
(6 rows)

postgres=# explain select * from tbref;
                                    QUERY PLAN                                    
----------------------------------------------------------------------------------
 Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
   Task Count: 1
   Tasks Shown: All
   ->  Task
         Node: host=127.0.0.1 port=9002 dbname=postgres
         ->  Seq Scan on tbref_102371 tbref  (cost=0.00..32.60 rows=2260 width=8)
(6 rows)           

citus.task_assignment_policy

的預設值是greedy。greedy比較适合多副本的分布表。

對于涉及多個shard的SQL,每個shard都有多個可選的副本,在greedy政策下,

Citus會盡量確定每個worker配置設定到任務數相同。

具體實作時Citus一次輪詢所有Worker,直到把所有shard任務都配置設定完。

是以對參考表這種隻有一個shard的場景,greedy會導緻其始終把任務配置設定給第一個worker。

詳細可以參考

GreedyAssignTaskList()

函數的代碼。

22.表資料導出優化

Citus導出資料時,中間結果會寫到在CN上,而且CN從Worker拉資料是并行拉的,不過Worker還是CN負載都會很高。

新版Citus優化了COPY導出處理,依次從每個Worker上抽出數傳回給用戶端,中途資料不落盤。

但是這一優化隻适用于下面這種固定形式的全表COPY到STDOUT的場景

COPY table tb1 to STDOUT           

這可以大大優化

pg_dump

,延遲更低,記憶體使用更少。

叢集管理增強

23.支援控制worker不配置設定shard

可以通過設定節點的shouldhaveshards屬性控制某個節點不放分片。

SELECT master_set_node_property('127.0.0.1', 9002, 'shouldhaveshards', false);           

shouldhaveshards屬性會對後續建立新的分布表和參考表生效。

也會對後續執行的企業版Citus的rebalance功能生效,社群版不支援rebalance,但如果自研Citus部署和維護工具也可以利用這個參數。

  • 擴充Worker的實作邏輯改為使用這個參數,簡化處理邏輯,不用先建好分布表後再挪分片。
  • 擴縮容腳本也可以使用這個參數決定Worker上是否放置分片,不需要區分是不是

    全部是擴充Worker

    的部署架構

24.支援使用

master_update_node

實施failover

采用主備流複制實作Worker高可用時,一般CN通過VIP通路Worker,worker主備切換時隻需要漂移vip到新的主節點即可。

新版Citus提供了一個新的可選方案,通過

master_update_node()

函數修改某個worker的IP和Port。

這提供了一種新的不依賴VIP的Worker HA實作方案。

postgres=# \df master_update_node
List of functions
-[ RECORD 1 ]-------+-----------------------------------------------------------------------------------------------------------------------------
Schema              | pg_catalog
Name                | master_update_node
Result data type    | void
Argument data types | node_id integer, new_node_name text, new_node_port integer, force boolean DEFAULT false, lock_cooldown integer DEFAULT 10000
Type                | func
           

25.支援變更親和定義

新版Citus可以在分布表建立後,修改親和關系。

create table tba(id int,c1 int);
select create_distributed_table('tba','id');
create table tbb(id int,c1 int);
select create_distributed_table('tbb','id');
create table tbc(id text,c1 int);
select create_distributed_table('tbc','id');           

tba和tbb這兩個表是親和的

postgres=# select * from pg_dist_partition where logicalrelid in ('tba'::regclass,'tbb'::regclass);
 logicalrelid | partmethod |                                                        partkey                                                         | colocationid | repmodel 
--------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+----------
 tba          | h          | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} |            3 | s
 tbb          | h          | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} |            3 | s
(2 rows)           

将tbb設定為新的親和ID,打破它們的親和關系

postgres=# SELECT update_distributed_table_colocation('tbb', colocate_with => 'none');
 update_distributed_table_colocation 
-------------------------------------
 
(1 row)

postgres=# select * from pg_dist_partition where logicalrelid in ('tba'::regclass,'tbb'::regclass);
 logicalrelid | partmethod |                                                        partkey                                                         | colocationid | repmodel 
--------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+----------
 tba          | h          | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} |            3 | s
 tbb          | h          | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} |           14 | s
(2 rows)           

重新設定它們親和

postgres=# SELECT update_distributed_table_colocation('tbb', colocate_with => 'tba');
 update_distributed_table_colocation 
-------------------------------------
 
(1 row)

postgres=# select * from pg_dist_partition where logicalrelid in ('tba'::regclass,'tbb'::regclass);
 logicalrelid | partmethod |                                                        partkey                                                         | colocationid | repmodel 
--------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+----------
 tba          | h          | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} |            3 | s
 tbb          | h          | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} |            3 | s
(2 rows)           

也可以用批量将一組表設定為和某一個表親和

postgres=# SELECT mark_tables_colocated('tba', ARRAY['tbb', 'tbc']);
ERROR:  cannot colocate tables tba and tbc
DETAIL:  Distribution column types don't match for tba and tbc.           

tbc的分片字段類型不一緻,不能親和,去掉tbc再次執行成功。

postgres=# SELECT mark_tables_colocated('tba', ARRAY['tbb']);
 mark_tables_colocated 
-----------------------
 
(1 row)           

26.支援truncate分布表的本地資料

把一個原來就有資料的本地表建立成分布表,會把原來的資料拷貝到各個shard上,但原始本地表上的資料不會删除,隻是對使用者不可見。

原來沒有直接的辦法删掉這些不需要的本地資料(可以通過臨時篡改中繼資料的方式删),現在可以用一個函數實作。

SELECT truncate_local_data_after_distributing_table('tb1');           

27. 延遲複制參考表副本

當新的Worker節點添加到Citus叢集的時候,會同步參考表的副本到上面。

如果叢集中存在比較大參考表,會導緻添加Worker節點的時間不可控。

這可能使得使用者不敢在業務高峰期擴容節點。

現在Citus可以支援把參考表的同步延遲到下次建立分片的的時候。

方法就是設定下面這個參數為off,它的預設值為on。

citus.replicate_reference_tables_on_activate = off           

這樣我們可以在白天擴容,夜裡在背景同步資料。

28.建立叢集範圍一緻的恢複點

之前我們備份Citus叢集的時候,都是各個節點各自備份恢複,真發生故障,沒辦法恢複到一個叢集範圍的一緻點。

現在可以使用下面的函數,建立一個全局的恢複點實行全局一緻性備份。

使用方法類似于PG的pg_create_restore_point(),詳細可參考手冊。

select citus_create_restore_point('foo');           

29.支援設定Citus叢集節點間互聯的連接配接選項

可以通過

citus.node_conninfo

參數設定Citus内節點間互連的一些非敏感的連接配接選項。

支援連接配接選項下面的libpq的一個子集。

  • application_name
  • connect_timeout
  • gsslib
  • keepalives
  • keepalives_count
  • keepalives_idle
  • keepalives_interval
  • krbsrvname
  • sslcompression
  • sslcrl
  • sslmode (defaults to “require” as of Citus 8.1)
  • sslrootcert

Citus 8.1以後,在支援SSL的PostgreSQL上,

citus.node_conninfo

的預設值為'sslmode=require'。

即預設開啟了SSL。這是Citus出于安全的考慮,但是啟用SSL後部署和維護會比較麻煩。

是以我們的部署環境下,需要将其修改為

sslmode=prefer

postgres=# show citus.node_conninfo;
 citus.node_conninfo 
---------------------
 sslmode=prefer
(1 row)           

30.預設關閉Citus統計收集

之前Citus的守護程序預設會收集Citus叢集的一些中繼資料資訊上報到CitusData公司的服務上(明顯有安全問題)。

新版本把這個功能預設關閉了。當然更徹底的做法是在編譯Citus的時候就把這個功能屏蔽掉。

postgres=# show citus.enable_statistics_collection;
 citus.enable_statistics_collection 
------------------------------------
 off
(1 row)           

31. 增加檢視叢集範圍活動的函數和視圖

新版Citus提供了幾個函數和視圖,可以在CN上非常友善的檢視整體Citus的目前活動狀況

  • citus_remote_connection_stats()

檢視所有worker上的來自CN節點和MX Worker節點的遠端連接配接數。

postgres=# select * from citus_remote_connection_stats();
 hostname  | port | database_name | connection_count_to_node 
-----------+------+---------------+--------------------------
 127.0.0.1 | 9002 | postgres      |                        3
 127.0.0.1 | 9001 | postgres      |                        3
(2 rows)           
  • citus_dist_stat_activity

檢視從本CN節點或MX worker上發起的活動。這個視圖在

pg_stat_activity

上附加了一些Citus相關的資訊。

postgres=# select * from citus_dist_stat_activity;
-[ RECORD 1 ]----------+------------------------------
query_hostname         | coordinator_host
query_hostport         | 9000
master_query_host_name | coordinator_host
master_query_host_port | 9000
transaction_number     | 57
transaction_stamp      | 2020-06-19 15:05:22.142242+08
datid                  | 13593
datname                | postgres
pid                    | 2574
usesysid               | 10
usename                | postgres
application_name       | psql
client_addr            | 
client_hostname        | 
client_port            | -1
backend_start          | 2020-06-19 10:57:58.472994+08
xact_start             | 2020-06-19 15:05:17.45487+08
query_start            | 2020-06-19 15:05:22.140954+08
state_change           | 2020-06-19 15:05:22.140957+08
wait_event_type        | Client
wait_event             | ClientRead
state                  | active
backend_xid            | 
backend_xmin           | 5114
query                  | select * from tb1;
backend_type           | client backend           

注意上面的

transaction_number

,它代表一個事務号。

涉及更新的SQL,事務塊中查詢和

push-pull

方式執行的查詢都會配置設定一個非0的事務号。

通過這個事務号,我們可以很容易地識别出所有worker上來自同一SQL(或事務)的活動。

詳細參考下面的注釋。(這段注釋應該寫錯了,下面2類SQL的差別不是是否能被'show',而是

transaction_number

是否非0)

citus-9.3.0/src/backend/distributed/transaction/citus_dist_stat_activity.c

*  An important note on this views is that they only show the activity
 *  that are inside distributed transactions. Distributed transactions
 *  cover the following:
 *     - All multi-shard modifications (DDLs, COPY, UPDATE, DELETE, INSERT .. SELECT)
 *     - All multi-shard queries with CTEs (modifying CTEs, read-only CTEs)
 *     - All recursively planned subqueries
 *     - All queries within transaction blocks (BEGIN; query; COMMMIT;)
 *
 *  In other words, the following types of queries won't be observed in these
 *  views:
 *      - Single-shard queries that are not inside transaction blocks
 *      - Multi-shard select queries that are not inside transaction blocks
 *      - Task-tracker queries           
  • citus_worker_stat_activity

檢視所有worker上的活動。排除非citus會話,即不經過CN或MX worker直連worker的會話。

我們可以指定

transaction_number

檢視特定SQL在worker上的活動。

postgres=# select * from citus_worker_stat_activity where transaction_number = 57;
-[ RECORD 1 ]----------+---------------------------------------------
query_hostname         | 127.0.0.1
query_hostport         | 9001
master_query_host_name | coordinator_host
master_query_host_port | 9000
transaction_number     | 57
transaction_stamp      | 2020-06-19 15:05:22.142242+08
datid                  | 13593
datname                | postgres
pid                    | 4108
usesysid               | 10
usename                | postgres
application_name       | citus
client_addr            | 127.0.0.1
client_hostname        | 
client_port            | 33676
backend_start          | 2020-06-19 15:05:22.162829+08
xact_start             | 2020-06-19 15:05:22.168811+08
query_start            | 2020-06-19 15:05:22.171398+08
state_change           | 2020-06-19 15:05:22.172237+08
wait_event_type        | Client
wait_event             | ClientRead
state                  | idle in transaction
backend_xid            | 
backend_xmin           | 
query                  | SELECT id, c1 FROM tb1_102369 tb1 WHERE true
backend_type           | client backend
...           
  • citus_lock_waits

檢視Citus叢集内的被阻塞的查詢。下面引用Ciuts手冊上的例子

CREATE TABLE numbers AS
  SELECT i, 0 AS j FROM generate_series(1,10) AS i;
SELECT create_distributed_table('numbers', 'i');           

使用2個會話終端,順序執行下面的SQL。

-- session 1                           -- session 2
-------------------------------------  -------------------------------------
BEGIN;
UPDATE numbers SET j = 2 WHERE i = 1;
                                       BEGIN;
                                       UPDATE numbers SET j = 3 WHERE i = 1;
                                       -- (this blocks)           

通過

citus_lock_waits

可以看到,這2個查詢是阻塞狀态。

SELECT * FROM citus_lock_waits;
-[ RECORD 1 ]-------------------------+----------------------------------------
waiting_pid                           | 88624
blocking_pid                          | 88615
blocked_statement                     | UPDATE numbers SET j = 3 WHERE i = 1;
current_statement_in_blocking_process | UPDATE numbers SET j = 2 WHERE i = 1;
waiting_node_id                       | 0
blocking_node_id                      | 0
waiting_node_name                     | coordinator_host
blocking_node_name                    | coordinator_host
waiting_node_port                     | 5432
blocking_node_port                    | 5432           

這個視圖隻能在CN節點檢視,MX worker節點查不到資料。但是并不要求阻塞所涉及的SQL必須從CN節點發起。

https://docs.citusdata.com/en/v9.3/develop/api_metadata.html?highlight=citus_worker_stat_activity#distributed-query-activity

32. 增加檢視表中繼資料的函數和視圖

  • master_get_table_metadata()

檢視分布表的中繼資料

postgres=# select * from master_get_table_metadata('tb1');
-[ RECORD 1 ]---------+-----------
logical_relid         | 17148
part_storage_type     | t
part_method           | h
part_key              | id
part_replica_count    | 1
part_max_size         | 1073741824
part_placement_policy | 2           
  • get_shard_id_for_distribution_column()

檢視某個分布列值對應的shardid

postgres=# SELECT get_shard_id_for_distribution_column('tb1', 4);
 get_shard_id_for_distribution_column 
--------------------------------------
                               102347
(1 row)           

其他

33. 允許在CN備庫執行簡單的DML

通過設定

citus.writable_standby_coordinator

參數為on,可以在CN的備庫上執行部分簡單的DML。

看下下面的例子

create table tbl(id int,c1 int);
select create_distributed_table('tbserial','id');           

在CN備節點上可以執行帶分片字段的DML

postgres=# insert into tb1 values(3,3);
ERROR:  writing to worker nodes is not currently allowed
DETAIL:  the database is in recovery mode 
postgres=# set citus.writable_standby_coordinator TO ON;
SET
postgres=# insert into tb1 values(3,3);
INSERT 0 1
postgres=# update tb1 set c1=20 where id=3;
UPDATE 1
postgres=# delete from tb1 where id=3;
DELETE 1           

不支援不帶分片字段的UPDATE和DELETE

postgres=# update tb1 set c1=20;
ERROR:  cannot assign TransactionIds during recovery
postgres=# delete from tb1 where c1=20;
ERROR:  cannot assign TransactionIds during recovery           

也不支援跨節點的事務

postgres=# begin;
BEGIN
postgres=*# insert into tb1 values(3,3);
INSERT 0 1
postgres=*# insert into tb1 values(4,4);
INSERT 0 1
postgres=*# commit;
ERROR:  cannot assign TransactionIds during recovery           

對于2pc的分布式事務,Citus需要将事務資訊記錄到事務表

pg_dist_transaction

中。

是以,Citus也無法在CN備節點上支援2pc的分布式事務。

但是如果切換成1pc送出模式,還是可以支援跨節點事務的。

postgres=# set citus.multi_shard_commit_protocol TO '1pc';
SET
postgres=# begin;
BEGIN
postgres=*# insert into tb1 values(4,4);
INSERT 0 1
postgres=*# insert into tb1 values(5,5);
INSERT 0 1
postgres=*# commit;           

并且在1pc送出模式下,跨多個分片的SQL也是支援的。

postgres=# set citus.multi_shard_commit_protocol TO '1pc';
SET
postgres=# update tb1 set c1=10;
UPDATE 3