天天看點

Greenplum資料庫資料分片政策Hash分布——執行器行為

Greenplum資料庫Hash分布執行器部分主要涉及Motion、Result和SplictUpdate節點。以使用CdbHash *makeCdbHash(int numsegs, int natts, Oid *hashfuncs)建立一個 CdbHash 結構體、cdbhashinit()執行初始化操作,僅僅是初始化hash初始值、cdbhash()函數會調用hashDatum()針對不同類型做不同的預處理,最後将處理後的列值添加到哈希計算中、cdbhashreduce() 映射哈希值到某個 segment為脈絡學習以下執行器對Hash分布的處理。

Motion

隻有當Motion類型為MOTIONTYPE_HASH且執行發送任務(MOTIONSTATE_SEND)的後端才可能涉及Hash分布處理(​

​motionstate->mstype == MOTIONSTATE_SEND && node->motionType == MOTIONTYPE_HASH​

​)。也就是說後端程序需要将處理的資料直接發送給其他後端程序,且這個接收後端可以通過分布鍵資料進行計算哈希值、映射segment後定位到。其執行堆棧為ExecInitNode --> ExecInitMotion --> makeCdbHash。

Greenplum資料庫資料分片政策Hash分布——執行器行為

涉及hash的motion執行流程堆棧如下ExecMotion --> execMotionSender --> doSendTuple --> evalHashkey(nodeMotion.c) --> cdbhashinit和cdbhash。調用doSendTuple發送tuple的情況下,當motion類型為MOTIONTYPE_HASH,就需要計算出該segment的index,然後設定到targetRoute變量中。計算的函數就是evalHashkey。

Greenplum資料庫資料分片政策Hash分布——執行器行為

如果向evalHashKey函數傳入分布鍵清單hashkeys則使用CdbHash計算哈希值、映射segment,如果沒有傳入分布鍵,直接随機選取segment。CdbHash計算哈希值、映射segment的流程在Greenplum資料庫Hash分布——計算哈希值和映射segment描述過,這裡就不做叙述了。

uint32 evalHashKey(ExprContext *econtext, List *hashkeys, CdbHash * h) {
  ListCell   *hk; unsigned int target_seg;
  ResetExprContext(econtext);
  MemoryContext oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); // 切換到ecxt_per_tuple_memory
  
  if (list_length(hashkeys) > 0){ /* If we have 1 or more distribution keys for this relation, hash them. However, If this happens to be a relation with an empty policy (partitioning policy with a NULL distribution key list) then we have no hash key value to feed in, so use cdbhashrandomseg() to pick a segment at random. */
    int      i = 0;
    cdbhashinit(h);
    foreach(hk, hashkeys){
      ExprState  *keyexpr = (ExprState *) lfirst(hk);
      Datum    keyval; bool    isNull;    
      keyval = ExecEvalExpr(keyexpr, econtext, &isNull); /* Get the attribute value of the tuple */      
      cdbhash(h, i + 1, keyval, isNull); /* Compute the hash function */
      i++;
    }
    target_seg = cdbhashreduce(h);
  }else {
    target_seg = cdbhashrandomseg(h->numsegs);
  }
  MemoryContextSwitchTo(oldContext);
  return target_seg;
}      

Result

關于Result節點的詳細内容可以參考PostgreSQL資料庫查詢執行——控制節點Result。greenplum在Result節點引入numHashFilterCols、hashFilterColIdx和hashFilterFuncs成員,為每行資料的hashFilterColIdx數組指定列計算cdbhash值,如果映射的segment不是目前執行該ResultNode的segment,該行資料将被抛棄,也就是引入了filter過濾資料功能。

Greenplum資料庫資料分片政策Hash分布——執行器行為

其執行堆棧為ExecInitNode --> ExecInitResult --> makeCdbHash。如果numHashFilterCols大于零時,使用numHashFilterCols、hashFilterFuncs和planNumSegments建立CdbHash結構體。

Greenplum資料庫資料分片政策Hash分布——執行器行為

Result節點執行堆棧為ExecResult --> TupleMatchesHashFilter(nodeResult.c) --> cdbhashinit\cdbhash。TupleMatchesHashFilter函數用于處理ExecInitNode greenplum在Result節點引入了filter過濾資料功能。TupleMatchesHashFilter如果資料tuple比對hash filter時傳回true。

/* Returns true if tuple matches hash filter. */
static bool TupleMatchesHashFilter(ResultState *node, TupleTableSlot *resultSlot){
  Result     *resultNode = (Result *)node->ps.plan;
  bool    res = true;
  if (node->hashFilter) {  // CdbHash結構體
    cdbhashinit(node->hashFilter);
    for (int i = 0; i < resultNode->numHashFilterCols; i++){ // 處理參與過濾的每個列
      int      attnum = resultNode->hashFilterColIdx[i]; // 取出列号
      Datum    hAttr; bool    isnull;
      hAttr = slot_getattr(resultSlot, attnum, &isnull);
      cdbhash(node->hashFilter, i + 1, hAttr, isnull); // 将列資料加入hash計算
    }
    int targetSeg = cdbhashreduce(node->hashFilter);
    res = (targetSeg == GpIdentity.segindex); // 判定是否是本segment index
  }
  return res;
}      

SplitUpdate

在Greenplum資料庫中,允許使用者更新元組的分片鍵,使得資料從目前節點轉移到另一個節點。Greenplum資料庫非常巧妙的利用了 Motion Node 可以在節點之間進行資料傳輸的特性,以及 PostgreSQL 的火山模型,實作了更新分片鍵的特性。PostgreSQL中并沒有任何的一個執行器節點能夠完成分布式資料的更新,Greenplum 引入了一個新的執行器節點 SplitUpdate。SplitUpdate 的原理其實非常簡單,其過程和 PostgreSQL 中更新資料類似,隻不過兩者的出發點不同而已。當我們執行 update 語句時,并不會原地更新資料,而是先插入一條更新後的資料,再删除舊資料。Greenplum 所實作的 Split Update 并不會執行 ExecUpdate() 函數,而是主動地将更新操作分裂成 INSERT 和 DELETE(其實就是在更新後資料存儲segment執行INSERT,在原資料存儲segment執行DELETE)。

postgres=# create table t (a int, b int);  // 分布鍵為a,分布政策為random
postgres=# insert into t values (1), (2), (3);
postgres=# select gp_segment_id, * from t; -- ①
 gp_segment_id | a | b 
---------------+---+---
 1 | 1 | 
 0 | 2 | 
 0 | 3 | 
(3 rows)      

如上例所示,從 ① 中我們可以看到資料目前是分布在 segment-0 和 segment-1 上的,當我們更新分布鍵 a 以後,資料将全部分布在了 segment-0 上。

postgres=# explain (verbose, costs off) update t set a = a + 1;
 QUERY PLAN 
--------------------------------------------------------------------
 Update on public.t -- ⑤
 -> Explicit Redistribute Motion 3:3 (slice1; segments: 3) -- ④
 Output: ((a + 1)), b, ctid, gp_segment_id, (DMLAction)
 -> Split
 Output: ((a + 1)), b, ctid, gp_segment_id, DMLAction -- ③
 -> Seq Scan on public.t
 Output: (a + 1), b, ctid, gp_segment_id -- ②
 Optimizer: Postgres query optimizer
(8 rows)      

在 ② 中我們可以看到,除了傳回表 t 所有的定義列以及 ctid 以外,還額外傳回了目前 tuple 的所處的 segment 的 segment_id,這是因為 Greenplum 是分布式資料庫,必須由 ​

​(gp_segment_id, ctid)​

​ 兩者才能唯一确定一個 tuple。而在 Split 節點中,輸出列又多了一個 DMLAction,這一列将會儲存 Split 節點向上輸出的兩個 tuple 中,哪一個執行插入,哪一個執行删除。Motion 節點則将資料發送至正确的 segment 上進行執行,并根據 DMLAction 的值決定執行插入還是删除。

postgres=# update t set a = a + 1;
UPDATE 3
postgres=# select gp_segment_id, * from t; 
 gp_segment_id | a | b 
---------------+---+---
 0 | 3 | 
 0 | 4 | 
 0 | 2 | 
(3 rows)      
Greenplum資料庫資料分片政策Hash分布——執行器行為

通常來說,當我們想要給 PostgreSQL 添加一個新文法或者新功能時,通常會按照添加新文法->添加新路徑->添加新查詢計劃->添加新的執行器節點等步驟來完成,但是 UPDATE 本身就是 PostgreSQL 原生的功能,是以不需要在 gram.y 中添加新的文法。但是,從 Split Update 的執行計劃我們可以看到,除了原有的 SeqScan 節點和 Update 節點以外,還會有 Split 節點和 Motion 節點。其中 Split 節點用于将一個 tuple 分裂成兩個,Motion 節點則将 Split 節點傳回的 tuple 根據分片鍵發送至正确的 segment 上執行插入或者删除。在 PostgreSQL 中,一個執行節點對應着一個 Plan Node,而一個 Plan Node 則是由最優的 Path 所構成的。是以,我們需要實作 Split Path 和 Split Plan,Motion Path 和 Motoin 已經被實作了,我們可以直接拿過來複用。Split Path 其實非常簡單,我們隻需要在其傳回的列中添加一個名叫 DMLAction 的标志位即可,它既 表示在執行 Modify Table 時執行的是 Split Update,同時也可以儲存對于目前 tuple 是執行插入還是删除。接下來我們就來看下在 PostgreSQL 中新增一個 Path 有多麼的簡單。

首先,我們要決定 Split Update Path 中到底需要什麼,或者說有哪些資訊是需要該節點儲存的。首先,Split Update Path 隻會有一個 SubPath,不存在左右子樹的情況,如 WindowAggPath。其次,我們最好是在 Split Update Path 中儲存我們到底更新的是哪一張表,這樣一來友善 Motion Path 打開這張表并擷取 Distributed Policy,進而決定到底需不需要添加 Motion 節點。如果被更新表是随機分布或者是複制表的話,那麼就不需要 Motion 節點了。是以,我們就得到了如下 Path 結構:

typedef struct SplitUpdatePath {
 Path path;
 Path *subpath; /* 子 Path,通常都是 ProjectionPath */
 Index resultRelation; /* 被更新表在 RTE 連結清單中的下标(從 1 開始) */
} SplitUpdatePath;      

緊接着,我們需要一個函數,來建立我們的 SplitUpdatePath 結構,這個函數的作用也非常簡單,隻需要把 SplitUpdatePath 添加到下層路徑之上,并将标志位添加至 PathTarget 中即可:

typedef struct DMLActionExpr {
 Expr xpr;
} DMLActionExpr;


static SplitUpdatePath *make_splitupdate_path(PlannerInfo *root, Path *subpath, Index rti) {
 RangeTblEntry *rte;
 PathTarget *splitUpdatePathTarget;
 SplitUpdatePath *splitupdatepath;
 DMLActionExpr *actionExpr; 
 rte = planner_rt_fetch(rti, root); /* Suppose we already hold locks before caller */
 
 actionExpr = makeNode(DMLActionExpr); /* 建立 DMLAction 列 */
 splitUpdatePathTarget = copy_pathtarget(subpath->pathtarget); 
 add_column_to_pathtarget(splitUpdatePathTarget, (Expr *) actionExpr, 0); /* 将 DMLAction 插入到 target list 的尾部,在執行階段取出 */

 /* populate information generated above into splitupdate node */
 splitupdatepath = makeNode(SplitUpdatePath); 
 splitupdatepath->path.pathtype = T_SplitUpdate; /* Split Path 的節點類型為 T_SplitUpdate */ 
 splitupdatepath->path.parent = subpath->parent; /* 它們具有相同的 RelOptInfo */ 
 splitupdatepath->path.pathtarget = splitUpdatePathTarget; /* 替換 pathtarget,即傳回列必須多出 DMLAction 列 */
 splitupdatepath->path.rows = 2 * subpath->rows;  /* 預估的 tuple 數量,由于 split update = delete + insert,是以會有 2 條資料 */ 
 splitupdatepath->path.param_info = NULL; /* 其餘參數照抄即可 */
 splitupdatepath->path.parallel_aware = false;
 splitupdatepath->path.parallel_safe = subpath->parallel_safe;
 splitupdatepath->path.parallel_workers = subpath->parallel_workers;
 splitupdatepath->path.startup_cost = subpath->startup_cost;
 splitupdatepath->path.total_cost = subpath->total_cost;
 splitupdatepath->path.pathkeys = subpath->pathkeys;
 splitupdatepath->path.locus = subpath->locus;
 /* 包裹 ProjectionPath,使其成為子節點 */
 splitupdatepath->subpath = subpath;
 splitupdatepath->resultRelation = rti;
 /* 這裡也隻是在原有的 path 下面加了一個一個屬性而已 */
 return splitupdatepath;
}      
Greenplum資料庫資料分片政策Hash分布——執行器行為

當有了 SplitUpdatePath 以後,剩下的就是将 Path 轉換成 Plan,由于我們并沒有其它可供競争的 Path,是以直接建構即可:

/* SplitUpdate Plan Node */ // 定義在src/include/nodes/plannodes.h中
typedef struct SplitUpdate {
 Plan plan;
 AttrNumber actionColIdx; /* DMLAction 在 targetlist 中的位置,便于快速通路 */
 AttrNumber tupleoidColIdx; /* ctid 在 targetlist 中的位置,便于快速通路 */
 List *insertColIdx; /* 執行 Insert 時需要使用到的 target list */
 List *deleteColIdx; /* 執行 Delete 時需要使用到的 target list */
 /* 下面的字段就是 Distributed Policy,在更新以哈希分布的表時會使用,主要用來計算
 * Insert 的 tuple 到哪個 segment,Delete 的話直接用本 segment 的 gp_segment_id
 * 即可。*/
 int numHashAttrs;
 AttrNumber *hashAttnos;
 Oid *hashFuncs; /* corresponding hash functions */
 int numHashSegments; /* # of segs to use in hash computation */
} SplitUpdate;      

接下來就是在 create_splitupdate_plan() 函數中填充 SplitUpdate 計劃節點中的字段,主要就是一些索引 List 和 Distributed Policy,這些内容流程比較簡單,但是又沒有現成的通用函數來完成,是以會有些繁瑣:

static Plan * create_splitupdate_plan(PlannerInfo *root, SplitUpdatePath *path) {
 Path *subpath = path->subpath;
 Plan *subplan;
 SplitUpdate *splitupdate;
 Relation resultRel;
 TupleDesc resultDesc;
 GpPolicy *cdbpolicy;
 int attrIdx;
 ListCell *lc;
 int lastresno;
 Oid *hashFuncs;
 int i;

 /* 擷取更新表的 Distributed Policy */
 resultRel = relation_open(planner_rt_fetch(path->resultRelation, root)->relid, NoLock);
 resultDesc = RelationGetDescr(resultRel);
 cdbpolicy = resultRel->rd_cdbpolicy;

 /* 遞歸建構 subpath 的 Plan */
 subplan = create_plan_recurse(root, subpath, CP_EXACT_TLIST);

 /* Transfer resname/resjunk labeling, too, to keep executor happy */
 apply_tlist_labeling(subplan->targetlist, root->processed_tlist);

 splitupdate = makeNode(SplitUpdate);

 splitupdate->plan.targetlist = NIL; /* filled in below */
 splitupdate->plan.qual = NIL;
 splitupdate->plan.lefttree = subplan;
 splitupdate->plan.righttree = NULL;

 copy_generic_path_info(&splitupdate->plan, (Path *) path);

 lc = list_head(subplan->targetlist);
 /* 周遊目标更新表的所有屬性 */
 for (attrIdx = 1; attrIdx <= resultDesc->natts; ++attrIdx) {
 TargetEntry *tle;
 Form_pg_attribute attr;

 tle = (TargetEntry *) lfirst(lc);
 lc = lnext(lc);

 attr = &resultDesc->attrs[attrIdx - 1];

 /* 建構 Insert 和 Delete 清單,其中 deleteColIdx 僅僅隻是為了滿足格式要求,無實際作用 */
 splitupdate->insertColIdx = lappend_int(splitupdate->insertColIdx, attrIdx);
 splitupdate->deleteColIdx = lappend_int(splitupdate->deleteColIdx, -1);

 splitupdate->plan.targetlist = lappend(splitupdate->plan.targetlist, tle);
 }
 lastresno = list_length(splitupdate->plan.targetlist);

 /* ....... */

 splitupdate->plan.targetlist = lappend(splitupdate->plan.targetlist,
 makeTargetEntry((Expr *) makeNode(DMLActionExpr),
 ++lastresno, "DMLAction", true));

 /* 建構 Distributed Policy 相關,例如哈希函數、分片鍵等等 */
 hashFuncs = palloc(cdbpolicy->nattrs * sizeof(Oid));
 for (i = 0; i < cdbpolicy->nattrs; i++) {
 AttrNumber attnum = cdbpolicy->attrs[i];
 Oid typeoid = resultDesc->attrs[attnum - 1].atttypid;
 Oid opfamily;

 opfamily = get_opclass_family(cdbpolicy->opclasses[i]);

 hashFuncs[i] = cdb_hashproc_in_opfamily(opfamily, typeoid);
 }
 splitupdate->numHashAttrs = cdbpolicy->nattrs;
 splitupdate->hashAttnos = palloc(cdbpolicy->nattrs * sizeof(AttrNumber));
 memcpy(splitupdate->hashAttnos, cdbpolicy->attrs, cdbpolicy->nattrs * sizeof(AttrNumber));
 splitupdate->hashFuncs = hashFuncs;
 splitupdate->numHashSegments = cdbpolicy->numsegments;

 relation_close(resultRel, NoLock);
 root->numMotions++;

 return (Plan *) splitupdate;
}      

當我們有了 Path 和 Plan 之後,剩下的就是執行器了。下面是對SplitUpdate進行初始化。

/* ExecNode for Split.
 * This operator contains a Plannode in PlanState. The Plannode contains indexes to the ctid, insert, delete, resjunk columns needed for adding the action (Insert/Delete). A MemoryContext and TupleTableSlot are maintained to keep the INSERT tuple until requested.
 */
typedef struct SplitUpdateState {
  PlanState  ps;
  bool    processInsert;  /* flag that specifies the operator's next action. */
  TupleTableSlot *insertTuple;  /* tuple to Insert */
  TupleTableSlot *deleteTuple;  /* tuple to Delete */
  AttrNumber  input_segid_attno;    /* attribute number of "gp_segment_id" in subplan's target list */
  AttrNumber  output_segid_attno;    /* attribute number of "gp_segment_id" in output target list */
  struct CdbHash *cdbhash;  /* hash api object */
} SplitUpdateState;      

SplitUpdate節點的初始化堆棧如下ExecInitNode --> ExecInitSplitUpdate --> makeCdbHash和ExecInitNode --> ExecInitSplitUpdate —> 設定ExecProcNode為ExecSplitUpdate(該函數會調用cdbhashinit\cdbhash)。

/* Init SplitUpdate Node. A memory context is created to hold Split Tuples. */
SplitUpdateState* ExecInitSplitUpdate(SplitUpdate *node, EState *estate, int eflags){
  SplitUpdateState *splitupdatestate = makeNode(SplitUpdateState);
  splitupdatestate->ps.plan = (Plan *)node;
  splitupdatestate->ps.state = estate;
  splitupdatestate->ps.ExecProcNode = ExecSplitUpdate; // 設定ExecProcNode為ExecSplitUpdate(該函數會調用cdbhashinit\cdbhash)
  splitupdatestate->processInsert = true;  
  Plan *outerPlan = outerPlan(node); /* then initialize outer plan */
  outerPlanState(splitupdatestate) = ExecInitNode(outerPlan, estate, eflags);
  ExecAssignExprContext(estate, &splitupdatestate->ps);

  /* New TupleDescriptor for output TupleTableSlots (old_values + new_values, ctid, gp_segment, action). */
  TupleDesc tupDesc = ExecTypeFromTL(node->plan.targetlist);
  splitupdatestate->insertTuple = ExecInitExtraTupleSlot(estate, tupDesc, &TTSOpsVirtual);
  splitupdatestate->deleteTuple = ExecInitExtraTupleSlot(estate, tupDesc, &TTSOpsVirtual);

  /* Look up the positions of the gp_segment_id in the subplan's target list, and in the result. */
  splitupdatestate->input_segid_attno = ExecFindJunkAttributeInTlist(outerPlan->targetlist, "gp_segment_id");
  splitupdatestate->output_segid_attno = ExecFindJunkAttributeInTlist(node->plan.targetlist, "gp_segment_id");

  /* DML nodes do not project. */
  ExecInitResultTupleSlotTL(&splitupdatestate->ps, &TTSOpsVirtual);
  splitupdatestate->ps.ps_ProjInfo = NULL;

  /* Initialize for computing hash key */
  if (node->numHashAttrs > 0)
    splitupdatestate->cdbhash = makeCdbHash(node->numHashSegments,node->numHashAttrs,node->hashFuncs); // 建立cdbhash
  if (estate->es_instrument && (estate->es_instrument & INSTRUMENT_CDB))
    splitupdatestate->ps.cdbexplainbuf = makeStringInfo();
  return splitupdatestate;
}      

對于 Split 執行節點而言,要做的事情就是将下層執行器節點傳回的 tuple 一分為二并儲存起來,一個表示删除,一個表示插入,然後繼續向上傳回給上層節點。執行堆棧如下所示ExecSplitUpdate --> SplitTupleTableSlot --> evalHashkey(nodeSplictUpdate.c) --> cdbhashinit\cdbhash。

/* Splits every TupleTableSlot into two TupleTableSlots: DELETE and INSERT. */
static TupleTableSlot *ExecSplitUpdate(PlanState *pstate){
  SplitUpdateState *node = castNode(SplitUpdateState, pstate);
  PlanState *outerNode = outerPlanState(node);
  SplitUpdate *plannode = (SplitUpdate *) node->ps.plan;
  TupleTableSlot *slot = NULL;
  TupleTableSlot *result = NULL;


  /* Returns INSERT TupleTableSlot. */
  if (!node->processInsert)
  {
    result = node->insertTuple;

    node->processInsert = true;
  }
  else
  {
    /* Creates both TupleTableSlots. Returns DELETE TupleTableSlots.*/
    slot = ExecProcNode(outerNode);

    if (TupIsNull(slot))
    {
      return NULL;
    }

    /* `Split' update into delete and insert */
    slot_getallattrs(slot);
    Datum     *values = slot->tts_values;
    bool     *nulls = slot->tts_isnull;

    ExecStoreAllNullTuple(node->deleteTuple);
    ExecStoreAllNullTuple(node->insertTuple);

    SplitTupleTableSlot(slot, plannode->plan.targetlist, plannode, node, values, nulls);

    result = node->deleteTuple;
    node->processInsert = false;

  }

  return result;
}      

SplitUpdate節點後續再研究

ExecInitNode --> ExecPartitionDistribute --> distributeTupleTableSlot --> ExecFindTargetPartitionContentId --> findTargetContentid --> makeCdbHashForRelation --> makeCdbHash