天天看点

Greenplum数据库外部表——fileam封装

Greenplum数据库外部表——fileam封装

该篇博客主要关注src/backend/access/external/fileam.c文件,其封装了底层获取数据和流控的实现细节,为上层执行节点提供抽象API(SCAN、INSERT功能)。

SCAN

ExecInitExternalScan
 | -- external_beginscan

ExecExternalScan 
 | -- ExternalNext
   | -- external_getnext_init
   | -- external_getnext
       
ExecReScanExternal
 | -- external_rescan
      | -- external_stopscan

ExecSquelchExternalScan
 | -- external_stopscan

ExecEndExternalScan/ExternalNext/ExecSquelchExternalScan
 | -- ExecEagerFreeExternalScan
       | -- external_endscan      

ExecInitExternalScan函数在​​Greenplum数据库外部表——Scan执行节点​​中描述过其大致流程,这里我们主要关注于​

​currentScanDesc = external_beginscan(currentRelation, node->scancounter, node->uriList, node->fmtOptString, node->fmtType, node->isMasterOnly, node->rejLimit, node->rejLimitInRows, node->logErrors, node->encoding)​

​​函数的调用,其主要工作就是创建FileScanDesc结构体并设置到ExternalScanState.ess_ScanDesc成员中,如下图的黄框所示。函数首先给FileScanDescData申请内存,设置fs_inited为false表明FileScanDesc还没有初始化完成,fs_rd设置为形参relation,fs_scancounter为形参scancounter,fs_noop成员代表segment没有数据获取(也就是no operation),fs_file设置为NULL,raw_buf_done仅用于custom external table;如果​

​relation->rd_att->constr != NULL && relation->rd_att->constr->num_check > 0​

​​成立,说明外部表上由constraint,需要将constraintExpr设置到fs_constraintExprs成员;下一步需要根据角色进行不同的初始化,如果为QE,则从uriList中更加segindex获取对应的uri,最后设置到fs_uri中,如果是QD且设置了isMasterOnly,说明是ON MASTER table,最后设置到fs_uri中;从系统表中为外部表每列获取input function和element type设置到in_functions和typioparams数组中;如果是custom格式,则从形参fmtOptString中获取custom_formatter_name和custom_formatter_params,否则调用parseCopyFormatString函数从形参fmtOptString中获取设置为copyFmtOpts;调用​

​BeginCopyFrom(relation, NULL, false, external_getdata_callback,(void *) scan,NIL,copyFmtOpts,NIL)​

​​初始化fs_pstate成员;从pg_exttable系统表中获取该外部表条目,利用​

​InitParseState(scan->fs_pstate, relation, false, fmtType,scan->fs_uri, rejLimit, rejLimitInRows, logErrors, extentry->options)​

​再次初始化fs_pstate成员;如果是custom external table,通过custom_formatter_name获取formatter函数oid,并将其设置到fs_custom_formatter_func,最后为fs_formatter创建FormatterData内存。

Greenplum数据库外部表——fileam封装

ExternalNext函数首先调用external_getnext_init函数创建ExternalSelectDescData结构体,如果​

​node->ss.ps​

​不为null,则将其ps_ProjInfo设置到ExternalSelectDesc的projInfo成员中(projInfo和filter_quals,即投影和过滤可以下推到external_getnext中处理)。获取tuple利用​​Greenplum数据库外部表——external_getnext获取元组​​函数获取,而ExternalSelectDescData结构体也是从该函数穿下去的,因此只有使用custom extable才能进行谓词下推。

static TupleTableSlot *ExternalNext(ExternalScanState *node){
  bool    scanNext = true;  
  EState     *estate = node->ss.ps.state; /* get information from the estate and scan state */
  FileScanDesc scandesc = node->ess_ScanDesc;
  ScanDirection direction = estate->es_direction;
  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
  
  ExternalSelectDesc externalSelectDesc = external_getnext_init(&(node->ss.ps));
  if (gp_external_enable_filter_pushdown)
    externalSelectDesc->filter_quals = node->ss.ps.plan->qual;
    
  /* get the next tuple from the file access methods */
  while(scanNext){
    HeapTuple tuple = external_getnext(scandesc, direction, externalSelectDesc);
    if (tuple){
      ExecStoreHeapTuple(tuple, slot, InvalidBuffer, true);
      if (node->ess_ScanDesc->fs_hasConstraints && !ExternalConstraintCheck(slot, node)){
        ExecClearTuple(slot);
        continue;
      }
        /* CDB: Label each row with a synthetic ctid if needed for subquery dedup.  */
        if (node->cdb_want_ctid && !TupIsNull(slot)){ slot_set_ctid_from_fake(slot, &node->cdb_fake_ctid); }
    }else{
      ExecClearTuple(slot);
      if (!node->delayEagerFree){ ExecEagerFreeExternalScan(node); }
    }
    scanNext = false;
  }
  pfree(externalSelectDesc);
  return slot;
}      
Greenplum数据库外部表——fileam封装

ExecReScanExternal函数的执行流程如下所示,其实就是调用external_rescan函数,而该函数最终会调用external_stopscan函数。

ExecReScanExternal
 | -- FileScanDesc fileScan = node->ess_ScanDesc
 | -- external_rescan
      | -- external_stopscan      

ExecSquelchExternalScan函数执行与ExecEndExternalScan相同,只是忽略了关闭错误。当外部数据源未用尽时(如LIMIT子句),此函数用于正常终止。Performs identically to ExecEndExternalScan except that closure errors are ignored. This function is called for normal termination when the external data source is NOT exhausted (such as for a LIMIT clause). ​

​ExecSquelchNode --> ExecSquelchExternalScan --> external_stopscan​

​。

ExecEndExternalScan函数主义用于释放上述流程分配的任何存储。比如​

​node->ss.ps​

​​、​

​node->ss.ps.ps_ResultTupleSlot​

​​、​

​node->ss.ss_ScanTupleSlot​

​​、​

​node->ess_ScanDesc​

​、关闭relation和执行ExecEagerFreeExternalScan函数。

ExecEndExternalScan/ExternalNext/ExecSquelchExternalScan
 | -- ExecEagerFreeExternalScan
       | -- external_endscan      

INSERT

向外部表INSERT流程主要包含如下三个部分external_insert_init、external_insert_init和external_insert_finish。这3个函数主要使用如下的结构体ExternalInsertDescData。

CopyFrom/ExecInsert
 | -- external_insert_init
CopyFrom/ExecInsert
 | -- external_insert
CopyFrom/ExecEndPlan
 | -- CloseResultRelInfo 
       | -- external_insert_finish      
Greenplum数据库外部表——fileam封装

​ExternalInsertDesc external_insert_init(Relation rel)​

​​函数首先为ExternalInsertDesc申请内存空间,将ext_rel设置为形参Relation,如果是QD,则将ext_noop设置为true,如果是QE,则将ext_noop设置为false;如果使用的EXECUTE exttable,则将ext_uri设置为​

​execute:extentry->command​

​​;否则就是gpfdist或custom,从​

​extentry->urilocations​

​​获取segment自己的uri(​

​int my_url = segindex % num_urls​

​​),设置到​

​extInsertDesc->ext_uri​

​​;为ext_pstate、ext_tupDesc、ext_values、ext_nulls分配内存;如果fmtcode为custom,则使用parseCustomFormatString解析fmtopts(​

​parseCustomFormatString(extentry->fmtopts,&custom_formatter_name,&custom_formatter_params)​

​​),否则使用​

​copyFmtOpts = parseCopyFormatString(rel, extentry->fmtopts, extentry->fmtcode)​

​​;使用​

​BeginCopyToForeignTable(rel, copyFmtOpts)​

​​初始化​

​extInsertDesc->ext_pstate​

​​,使用​

​InitParseState(extInsertDesc->ext_pstate, rel, true, extentry->fmtcode, extInsertDesc->ext_uri, extentry->rejectlimit, (extentry->rejectlimittype == 'r'), extentry->logerrors, extentry->options)​

​初始化ext_pstate成员;如果是custom external table,通过custom_formatter_name获取formatter函数oid,并将其设置到fs_custom_formatter_func,最后为fs_formatter创建FormatterData内存。

​Oid external_insert(ExternalInsertDesc extInsertDesc, HeapTuple instup)​

​函数将tuple格式化后写入外部数据源,和heap_insert不同支持有3处:wal是旁路的;事务信息是不care的;元组永远是向destination发送的(local file or remote target)。其主要流程是:打开外部数据源;根据格式将元组解构并格式化为外部源的数据格式;调用external_senddata函数发送数据。

Oid external_insert(ExternalInsertDesc extInsertDesc, HeapTuple instup){
  TupleDesc tupDesc = extInsertDesc->ext_tupDesc;
  Datum    *values = extInsertDesc->ext_values;
  bool     *nulls = extInsertDesc->ext_nulls;
  CopyStateData *pstate = extInsertDesc->ext_pstate;
  bool    customFormat = (extInsertDesc->ext_custom_formatter_func != NULL);

  if (extInsertDesc->ext_noop) return InvalidOid;
  /* Open our output file or output stream if not yet open */
  if (!extInsertDesc->ext_file && !extInsertDesc->ext_noop) open_external_writable_source(extInsertDesc);

  /* deconstruct the tuple and format it into text */
  if (!customFormat){/* TEXT or CSV */
    heap_deform_tuple(instup, tupDesc, values, nulls);
    CopyOneRowTo(pstate, HeapTupleGetOid(instup), values, nulls);
    CopySendEndOfRow(pstate);
  }else{ /* custom format. convert tuple using user formatter */
    FunctionCallInfoData fcinfo;
    /* There is some redundancy between FormatterData and ExternalInsertDesc we may be able to consolidate data structures a little. */
    FormatterData *formatter = extInsertDesc->ext_formatter_data;
    /* per call formatter prep */
    FunctionCallPrepareFormatter(&fcinfo,1,pstate,extInsertDesc->ext_custom_formatter_func,extInsertDesc->ext_custom_formatter_params,formatter,extInsertDesc->ext_rel,extInsertDesc->ext_tupDesc,pstate->out_functions,NULL);
    /* Mark the correct record type in the passed tuple */
    HeapTupleHeaderSetDatumLength(instup->t_data, instup->t_len);
    HeapTupleHeaderSetTypeId(instup->t_data, tupDesc->tdtypeid);
    HeapTupleHeaderSetTypMod(instup->t_data, tupDesc->tdtypmod);
    fcinfo.arg[0] = HeapTupleGetDatum(instup);
    fcinfo.argnull[0] = false;
    Datum d = FunctionCallInvoke(&fcinfo);
    MemoryContextReset(formatter->fmt_perrow_ctx);

    /* We do not expect a null result */
    if (fcinfo.isnull) elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);

    bytea    *b = DatumGetByteaP(d);
    CopyOneCustomRowTo(pstate, b);
  }
  
  external_senddata(extInsertDesc->ext_file, pstate); /* Write the data into the external source */

  /* Reset our buffer to start clean next round */
  pstate->fe_msgbuf->len = 0;
  pstate->fe_msgbuf->data[0] = '\0';

  return HeapTupleGetOid(instup);
}      
void external_insert_finish(ExternalInsertDesc extInsertDesc){
  /* Close the external source */
  if (extInsertDesc->ext_file){
    char     *relname = RelationGetRelationName(extInsertDesc->ext_rel);
    url_fflush(extInsertDesc->ext_file, extInsertDesc->ext_pstate);
    url_fclose(extInsertDesc->ext_file, true, relname);
  }
  if (extInsertDesc->ext_formatter_data) pfree(extInsertDesc->ext_formatter_data);
  pfree(extInsertDesc);
}      
#define ExtTableRelationId  6040
CATALOG(pg_exttable,6040) BKI_WITHOUT_OIDS
{
  Oid   reloid;       /* refers to this relation's oid in pg_class  */
  text  urilocation[1];   /* array of URI strings */
  text  execlocation[1];  /* array of ON locations */
  char  fmttype;      /* 't' (text) or 'c' (csv) */
  text  fmtopts;      /* the data format options */
  text  options[1];     /* the array of external table options */
  text  command;      /* the command string to EXECUTE */
  int32 rejectlimit;    /* error count reject limit per segment */
  char  rejectlimittype;  /* 'r' (rows) or 'p' (percent) */
  bool  logerrors;      /* 't' to log errors into file */
  int32 encoding;     /* character encoding of this external table */
  bool  writable;     /* 't' if writable, 'f' if readable */
} FormData_pg_exttable;