天天看點

Greenplum資料庫外部表——fileam封裝

Greenplum資料庫外部表——fileam封裝

該篇部落客要關注src/backend/access/external/fileam.c檔案,其封裝了底層擷取資料和流控的實作細節,為上層執行節點提供抽象API(SCAN、INSERT功能)。

SCAN

ExecInitExternalScan
 | -- external_beginscan

ExecExternalScan 
 | -- ExternalNext
   | -- external_getnext_init
   | -- external_getnext
       
ExecReScanExternal
 | -- external_rescan
      | -- external_stopscan

ExecSquelchExternalScan
 | -- external_stopscan

ExecEndExternalScan/ExternalNext/ExecSquelchExternalScan
 | -- ExecEagerFreeExternalScan
       | -- external_endscan      

ExecInitExternalScan函數在​​Greenplum資料庫外部表——Scan執行節點​​中描述過其大緻流程,這裡我們主要關注于​

​currentScanDesc = external_beginscan(currentRelation, node->scancounter, node->uriList, node->fmtOptString, node->fmtType, node->isMasterOnly, node->rejLimit, node->rejLimitInRows, node->logErrors, node->encoding)​

​​函數的調用,其主要工作就是建立FileScanDesc結構體并設定到ExternalScanState.ess_ScanDesc成員中,如下圖的黃框所示。函數首先給FileScanDescData申請記憶體,設定fs_inited為false表明FileScanDesc還沒有初始化完成,fs_rd設定為形參relation,fs_scancounter為形參scancounter,fs_noop成員代表segment沒有資料擷取(也就是no operation),fs_file設定為NULL,raw_buf_done僅用于custom external table;如果​

​relation->rd_att->constr != NULL && relation->rd_att->constr->num_check > 0​

​​成立,說明外部表上由constraint,需要将constraintExpr設定到fs_constraintExprs成員;下一步需要根據角色進行不同的初始化,如果為QE,則從uriList中更加segindex擷取對應的uri,最後設定到fs_uri中,如果是QD且設定了isMasterOnly,說明是ON MASTER table,最後設定到fs_uri中;從系統表中為外部表每列擷取input function和element type設定到in_functions和typioparams數組中;如果是custom格式,則從形參fmtOptString中擷取custom_formatter_name和custom_formatter_params,否則調用parseCopyFormatString函數從形參fmtOptString中擷取設定為copyFmtOpts;調用​

​BeginCopyFrom(relation, NULL, false, external_getdata_callback,(void *) scan,NIL,copyFmtOpts,NIL)​

​​初始化fs_pstate成員;從pg_exttable系統表中擷取該外部表條目,利用​

​InitParseState(scan->fs_pstate, relation, false, fmtType,scan->fs_uri, rejLimit, rejLimitInRows, logErrors, extentry->options)​

​再次初始化fs_pstate成員;如果是custom external table,通過custom_formatter_name擷取formatter函數oid,并将其設定到fs_custom_formatter_func,最後為fs_formatter建立FormatterData記憶體。

Greenplum資料庫外部表——fileam封裝

ExternalNext函數首先調用external_getnext_init函數建立ExternalSelectDescData結構體,如果​

​node->ss.ps​

​不為null,則将其ps_ProjInfo設定到ExternalSelectDesc的projInfo成員中(projInfo和filter_quals,即投影和過濾可以下推到external_getnext中處理)。擷取tuple利用​​Greenplum資料庫外部表——external_getnext擷取元組​​函數擷取,而ExternalSelectDescData結構體也是從該函數穿下去的,是以隻有使用custom extable才能進行謂詞下推。

static TupleTableSlot *ExternalNext(ExternalScanState *node){
  bool    scanNext = true;  
  EState     *estate = node->ss.ps.state; /* get information from the estate and scan state */
  FileScanDesc scandesc = node->ess_ScanDesc;
  ScanDirection direction = estate->es_direction;
  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
  
  ExternalSelectDesc externalSelectDesc = external_getnext_init(&(node->ss.ps));
  if (gp_external_enable_filter_pushdown)
    externalSelectDesc->filter_quals = node->ss.ps.plan->qual;
    
  /* get the next tuple from the file access methods */
  while(scanNext){
    HeapTuple tuple = external_getnext(scandesc, direction, externalSelectDesc);
    if (tuple){
      ExecStoreHeapTuple(tuple, slot, InvalidBuffer, true);
      if (node->ess_ScanDesc->fs_hasConstraints && !ExternalConstraintCheck(slot, node)){
        ExecClearTuple(slot);
        continue;
      }
        /* CDB: Label each row with a synthetic ctid if needed for subquery dedup.  */
        if (node->cdb_want_ctid && !TupIsNull(slot)){ slot_set_ctid_from_fake(slot, &node->cdb_fake_ctid); }
    }else{
      ExecClearTuple(slot);
      if (!node->delayEagerFree){ ExecEagerFreeExternalScan(node); }
    }
    scanNext = false;
  }
  pfree(externalSelectDesc);
  return slot;
}      
Greenplum資料庫外部表——fileam封裝

ExecReScanExternal函數的執行流程如下所示,其實就是調用external_rescan函數,而該函數最終會調用external_stopscan函數。

ExecReScanExternal
 | -- FileScanDesc fileScan = node->ess_ScanDesc
 | -- external_rescan
      | -- external_stopscan      

ExecSquelchExternalScan函數執行與ExecEndExternalScan相同,隻是忽略了關閉錯誤。當外部資料源未用盡時(如LIMIT子句),此函數用于正常終止。Performs identically to ExecEndExternalScan except that closure errors are ignored. This function is called for normal termination when the external data source is NOT exhausted (such as for a LIMIT clause). ​

​ExecSquelchNode --> ExecSquelchExternalScan --> external_stopscan​

​。

ExecEndExternalScan函數主義用于釋放上述流程配置設定的任何存儲。比如​

​node->ss.ps​

​​、​

​node->ss.ps.ps_ResultTupleSlot​

​​、​

​node->ss.ss_ScanTupleSlot​

​​、​

​node->ess_ScanDesc​

​、關閉relation和執行ExecEagerFreeExternalScan函數。

ExecEndExternalScan/ExternalNext/ExecSquelchExternalScan
 | -- ExecEagerFreeExternalScan
       | -- external_endscan      

INSERT

向外部表INSERT流程主要包含如下三個部分external_insert_init、external_insert_init和external_insert_finish。這3個函數主要使用如下的結構體ExternalInsertDescData。

CopyFrom/ExecInsert
 | -- external_insert_init
CopyFrom/ExecInsert
 | -- external_insert
CopyFrom/ExecEndPlan
 | -- CloseResultRelInfo 
       | -- external_insert_finish      
Greenplum資料庫外部表——fileam封裝

​ExternalInsertDesc external_insert_init(Relation rel)​

​​函數首先為ExternalInsertDesc申請記憶體空間,将ext_rel設定為形參Relation,如果是QD,則将ext_noop設定為true,如果是QE,則将ext_noop設定為false;如果使用的EXECUTE exttable,則将ext_uri設定為​

​execute:extentry->command​

​​;否則就是gpfdist或custom,從​

​extentry->urilocations​

​​擷取segment自己的uri(​

​int my_url = segindex % num_urls​

​​),設定到​

​extInsertDesc->ext_uri​

​​;為ext_pstate、ext_tupDesc、ext_values、ext_nulls配置設定記憶體;如果fmtcode為custom,則使用parseCustomFormatString解析fmtopts(​

​parseCustomFormatString(extentry->fmtopts,&custom_formatter_name,&custom_formatter_params)​

​​),否則使用​

​copyFmtOpts = parseCopyFormatString(rel, extentry->fmtopts, extentry->fmtcode)​

​​;使用​

​BeginCopyToForeignTable(rel, copyFmtOpts)​

​​初始化​

​extInsertDesc->ext_pstate​

​​,使用​

​InitParseState(extInsertDesc->ext_pstate, rel, true, extentry->fmtcode, extInsertDesc->ext_uri, extentry->rejectlimit, (extentry->rejectlimittype == 'r'), extentry->logerrors, extentry->options)​

​初始化ext_pstate成員;如果是custom external table,通過custom_formatter_name擷取formatter函數oid,并将其設定到fs_custom_formatter_func,最後為fs_formatter建立FormatterData記憶體。

​Oid external_insert(ExternalInsertDesc extInsertDesc, HeapTuple instup)​

​函數将tuple格式化後寫入外部資料源,和heap_insert不同支援有3處:wal是旁路的;事務資訊是不care的;元組永遠是向destination發送的(local file or remote target)。其主要流程是:打開外部資料源;根據格式将元組解構并格式化為外部源的資料格式;調用external_senddata函數發送資料。

Oid external_insert(ExternalInsertDesc extInsertDesc, HeapTuple instup){
  TupleDesc tupDesc = extInsertDesc->ext_tupDesc;
  Datum    *values = extInsertDesc->ext_values;
  bool     *nulls = extInsertDesc->ext_nulls;
  CopyStateData *pstate = extInsertDesc->ext_pstate;
  bool    customFormat = (extInsertDesc->ext_custom_formatter_func != NULL);

  if (extInsertDesc->ext_noop) return InvalidOid;
  /* Open our output file or output stream if not yet open */
  if (!extInsertDesc->ext_file && !extInsertDesc->ext_noop) open_external_writable_source(extInsertDesc);

  /* deconstruct the tuple and format it into text */
  if (!customFormat){/* TEXT or CSV */
    heap_deform_tuple(instup, tupDesc, values, nulls);
    CopyOneRowTo(pstate, HeapTupleGetOid(instup), values, nulls);
    CopySendEndOfRow(pstate);
  }else{ /* custom format. convert tuple using user formatter */
    FunctionCallInfoData fcinfo;
    /* There is some redundancy between FormatterData and ExternalInsertDesc we may be able to consolidate data structures a little. */
    FormatterData *formatter = extInsertDesc->ext_formatter_data;
    /* per call formatter prep */
    FunctionCallPrepareFormatter(&fcinfo,1,pstate,extInsertDesc->ext_custom_formatter_func,extInsertDesc->ext_custom_formatter_params,formatter,extInsertDesc->ext_rel,extInsertDesc->ext_tupDesc,pstate->out_functions,NULL);
    /* Mark the correct record type in the passed tuple */
    HeapTupleHeaderSetDatumLength(instup->t_data, instup->t_len);
    HeapTupleHeaderSetTypeId(instup->t_data, tupDesc->tdtypeid);
    HeapTupleHeaderSetTypMod(instup->t_data, tupDesc->tdtypmod);
    fcinfo.arg[0] = HeapTupleGetDatum(instup);
    fcinfo.argnull[0] = false;
    Datum d = FunctionCallInvoke(&fcinfo);
    MemoryContextReset(formatter->fmt_perrow_ctx);

    /* We do not expect a null result */
    if (fcinfo.isnull) elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);

    bytea    *b = DatumGetByteaP(d);
    CopyOneCustomRowTo(pstate, b);
  }
  
  external_senddata(extInsertDesc->ext_file, pstate); /* Write the data into the external source */

  /* Reset our buffer to start clean next round */
  pstate->fe_msgbuf->len = 0;
  pstate->fe_msgbuf->data[0] = '\0';

  return HeapTupleGetOid(instup);
}      
void external_insert_finish(ExternalInsertDesc extInsertDesc){
  /* Close the external source */
  if (extInsertDesc->ext_file){
    char     *relname = RelationGetRelationName(extInsertDesc->ext_rel);
    url_fflush(extInsertDesc->ext_file, extInsertDesc->ext_pstate);
    url_fclose(extInsertDesc->ext_file, true, relname);
  }
  if (extInsertDesc->ext_formatter_data) pfree(extInsertDesc->ext_formatter_data);
  pfree(extInsertDesc);
}      
#define ExtTableRelationId  6040
CATALOG(pg_exttable,6040) BKI_WITHOUT_OIDS
{
  Oid   reloid;       /* refers to this relation's oid in pg_class  */
  text  urilocation[1];   /* array of URI strings */
  text  execlocation[1];  /* array of ON locations */
  char  fmttype;      /* 't' (text) or 'c' (csv) */
  text  fmtopts;      /* the data format options */
  text  options[1];     /* the array of external table options */
  text  command;      /* the command string to EXECUTE */
  int32 rejectlimit;    /* error count reject limit per segment */
  char  rejectlimittype;  /* 'r' (rows) or 'p' (percent) */
  bool  logerrors;      /* 't' to log errors into file */
  int32 encoding;     /* character encoding of this external table */
  bool  writable;     /* 't' if writable, 'f' if readable */
} FormData_pg_exttable;