該篇部落客要關注src/backend/access/external/fileam.c檔案,其封裝了底層擷取資料和流控的實作細節,為上層執行節點提供抽象API(SCAN、INSERT功能)。
SCAN
ExecInitExternalScan
| -- external_beginscan
ExecExternalScan
| -- ExternalNext
| -- external_getnext_init
| -- external_getnext
ExecReScanExternal
| -- external_rescan
| -- external_stopscan
ExecSquelchExternalScan
| -- external_stopscan
ExecEndExternalScan/ExternalNext/ExecSquelchExternalScan
| -- ExecEagerFreeExternalScan
| -- external_endscan
ExecInitExternalScan函數在Greenplum資料庫外部表——Scan執行節點中描述過其大緻流程,這裡我們主要關注于
currentScanDesc = external_beginscan(currentRelation, node->scancounter, node->uriList, node->fmtOptString, node->fmtType, node->isMasterOnly, node->rejLimit, node->rejLimitInRows, node->logErrors, node->encoding)
函數的調用,其主要工作就是建立FileScanDesc結構體并設定到ExternalScanState.ess_ScanDesc成員中,如下圖的黃框所示。函數首先給FileScanDescData申請記憶體,設定fs_inited為false表明FileScanDesc還沒有初始化完成,fs_rd設定為形參relation,fs_scancounter為形參scancounter,fs_noop成員代表segment沒有資料擷取(也就是no operation),fs_file設定為NULL,raw_buf_done僅用于custom external table;如果
relation->rd_att->constr != NULL && relation->rd_att->constr->num_check > 0
成立,說明外部表上由constraint,需要将constraintExpr設定到fs_constraintExprs成員;下一步需要根據角色進行不同的初始化,如果為QE,則從uriList中更加segindex擷取對應的uri,最後設定到fs_uri中,如果是QD且設定了isMasterOnly,說明是ON MASTER table,最後設定到fs_uri中;從系統表中為外部表每列擷取input function和element type設定到in_functions和typioparams數組中;如果是custom格式,則從形參fmtOptString中擷取custom_formatter_name和custom_formatter_params,否則調用parseCopyFormatString函數從形參fmtOptString中擷取設定為copyFmtOpts;調用
BeginCopyFrom(relation, NULL, false, external_getdata_callback,(void *) scan,NIL,copyFmtOpts,NIL)
初始化fs_pstate成員;從pg_exttable系統表中擷取該外部表條目,利用
InitParseState(scan->fs_pstate, relation, false, fmtType,scan->fs_uri, rejLimit, rejLimitInRows, logErrors, extentry->options)
再次初始化fs_pstate成員;如果是custom external table,通過custom_formatter_name擷取formatter函數oid,并将其設定到fs_custom_formatter_func,最後為fs_formatter建立FormatterData記憶體。
ExternalNext函數首先調用external_getnext_init函數建立ExternalSelectDescData結構體,如果
node->ss.ps
不為null,則将其ps_ProjInfo設定到ExternalSelectDesc的projInfo成員中(projInfo和filter_quals,即投影和過濾可以下推到external_getnext中處理)。擷取tuple利用Greenplum資料庫外部表——external_getnext擷取元組函數擷取,而ExternalSelectDescData結構體也是從該函數穿下去的,是以隻有使用custom extable才能進行謂詞下推。
static TupleTableSlot *ExternalNext(ExternalScanState *node){
bool scanNext = true;
EState *estate = node->ss.ps.state; /* get information from the estate and scan state */
FileScanDesc scandesc = node->ess_ScanDesc;
ScanDirection direction = estate->es_direction;
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
ExternalSelectDesc externalSelectDesc = external_getnext_init(&(node->ss.ps));
if (gp_external_enable_filter_pushdown)
externalSelectDesc->filter_quals = node->ss.ps.plan->qual;
/* get the next tuple from the file access methods */
while(scanNext){
HeapTuple tuple = external_getnext(scandesc, direction, externalSelectDesc);
if (tuple){
ExecStoreHeapTuple(tuple, slot, InvalidBuffer, true);
if (node->ess_ScanDesc->fs_hasConstraints && !ExternalConstraintCheck(slot, node)){
ExecClearTuple(slot);
continue;
}
/* CDB: Label each row with a synthetic ctid if needed for subquery dedup. */
if (node->cdb_want_ctid && !TupIsNull(slot)){ slot_set_ctid_from_fake(slot, &node->cdb_fake_ctid); }
}else{
ExecClearTuple(slot);
if (!node->delayEagerFree){ ExecEagerFreeExternalScan(node); }
}
scanNext = false;
}
pfree(externalSelectDesc);
return slot;
}
ExecReScanExternal函數的執行流程如下所示,其實就是調用external_rescan函數,而該函數最終會調用external_stopscan函數。
ExecReScanExternal
| -- FileScanDesc fileScan = node->ess_ScanDesc
| -- external_rescan
| -- external_stopscan
ExecSquelchExternalScan函數執行與ExecEndExternalScan相同,隻是忽略了關閉錯誤。當外部資料源未用盡時(如LIMIT子句),此函數用于正常終止。Performs identically to ExecEndExternalScan except that closure errors are ignored. This function is called for normal termination when the external data source is NOT exhausted (such as for a LIMIT clause).
ExecSquelchNode --> ExecSquelchExternalScan --> external_stopscan
。
ExecEndExternalScan函數主義用于釋放上述流程配置設定的任何存儲。比如
node->ss.ps
、
node->ss.ps.ps_ResultTupleSlot
、
node->ss.ss_ScanTupleSlot
、
node->ess_ScanDesc
、關閉relation和執行ExecEagerFreeExternalScan函數。
ExecEndExternalScan/ExternalNext/ExecSquelchExternalScan
| -- ExecEagerFreeExternalScan
| -- external_endscan
INSERT
向外部表INSERT流程主要包含如下三個部分external_insert_init、external_insert_init和external_insert_finish。這3個函數主要使用如下的結構體ExternalInsertDescData。
CopyFrom/ExecInsert
| -- external_insert_init
CopyFrom/ExecInsert
| -- external_insert
CopyFrom/ExecEndPlan
| -- CloseResultRelInfo
| -- external_insert_finish
ExternalInsertDesc external_insert_init(Relation rel)
函數首先為ExternalInsertDesc申請記憶體空間,将ext_rel設定為形參Relation,如果是QD,則将ext_noop設定為true,如果是QE,則将ext_noop設定為false;如果使用的EXECUTE exttable,則将ext_uri設定為
execute:extentry->command
;否則就是gpfdist或custom,從
extentry->urilocations
擷取segment自己的uri(
int my_url = segindex % num_urls
),設定到
extInsertDesc->ext_uri
;為ext_pstate、ext_tupDesc、ext_values、ext_nulls配置設定記憶體;如果fmtcode為custom,則使用parseCustomFormatString解析fmtopts(
parseCustomFormatString(extentry->fmtopts,&custom_formatter_name,&custom_formatter_params)
),否則使用
copyFmtOpts = parseCopyFormatString(rel, extentry->fmtopts, extentry->fmtcode)
;使用
BeginCopyToForeignTable(rel, copyFmtOpts)
初始化
extInsertDesc->ext_pstate
,使用
InitParseState(extInsertDesc->ext_pstate, rel, true, extentry->fmtcode, extInsertDesc->ext_uri, extentry->rejectlimit, (extentry->rejectlimittype == 'r'), extentry->logerrors, extentry->options)
初始化ext_pstate成員;如果是custom external table,通過custom_formatter_name擷取formatter函數oid,并将其設定到fs_custom_formatter_func,最後為fs_formatter建立FormatterData記憶體。
Oid external_insert(ExternalInsertDesc extInsertDesc, HeapTuple instup)
函數将tuple格式化後寫入外部資料源,和heap_insert不同支援有3處:wal是旁路的;事務資訊是不care的;元組永遠是向destination發送的(local file or remote target)。其主要流程是:打開外部資料源;根據格式将元組解構并格式化為外部源的資料格式;調用external_senddata函數發送資料。
Oid external_insert(ExternalInsertDesc extInsertDesc, HeapTuple instup){
TupleDesc tupDesc = extInsertDesc->ext_tupDesc;
Datum *values = extInsertDesc->ext_values;
bool *nulls = extInsertDesc->ext_nulls;
CopyStateData *pstate = extInsertDesc->ext_pstate;
bool customFormat = (extInsertDesc->ext_custom_formatter_func != NULL);
if (extInsertDesc->ext_noop) return InvalidOid;
/* Open our output file or output stream if not yet open */
if (!extInsertDesc->ext_file && !extInsertDesc->ext_noop) open_external_writable_source(extInsertDesc);
/* deconstruct the tuple and format it into text */
if (!customFormat){/* TEXT or CSV */
heap_deform_tuple(instup, tupDesc, values, nulls);
CopyOneRowTo(pstate, HeapTupleGetOid(instup), values, nulls);
CopySendEndOfRow(pstate);
}else{ /* custom format. convert tuple using user formatter */
FunctionCallInfoData fcinfo;
/* There is some redundancy between FormatterData and ExternalInsertDesc we may be able to consolidate data structures a little. */
FormatterData *formatter = extInsertDesc->ext_formatter_data;
/* per call formatter prep */
FunctionCallPrepareFormatter(&fcinfo,1,pstate,extInsertDesc->ext_custom_formatter_func,extInsertDesc->ext_custom_formatter_params,formatter,extInsertDesc->ext_rel,extInsertDesc->ext_tupDesc,pstate->out_functions,NULL);
/* Mark the correct record type in the passed tuple */
HeapTupleHeaderSetDatumLength(instup->t_data, instup->t_len);
HeapTupleHeaderSetTypeId(instup->t_data, tupDesc->tdtypeid);
HeapTupleHeaderSetTypMod(instup->t_data, tupDesc->tdtypmod);
fcinfo.arg[0] = HeapTupleGetDatum(instup);
fcinfo.argnull[0] = false;
Datum d = FunctionCallInvoke(&fcinfo);
MemoryContextReset(formatter->fmt_perrow_ctx);
/* We do not expect a null result */
if (fcinfo.isnull) elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
bytea *b = DatumGetByteaP(d);
CopyOneCustomRowTo(pstate, b);
}
external_senddata(extInsertDesc->ext_file, pstate); /* Write the data into the external source */
/* Reset our buffer to start clean next round */
pstate->fe_msgbuf->len = 0;
pstate->fe_msgbuf->data[0] = '\0';
return HeapTupleGetOid(instup);
}
void external_insert_finish(ExternalInsertDesc extInsertDesc){
/* Close the external source */
if (extInsertDesc->ext_file){
char *relname = RelationGetRelationName(extInsertDesc->ext_rel);
url_fflush(extInsertDesc->ext_file, extInsertDesc->ext_pstate);
url_fclose(extInsertDesc->ext_file, true, relname);
}
if (extInsertDesc->ext_formatter_data) pfree(extInsertDesc->ext_formatter_data);
pfree(extInsertDesc);
}
#define ExtTableRelationId 6040
CATALOG(pg_exttable,6040) BKI_WITHOUT_OIDS
{
Oid reloid; /* refers to this relation's oid in pg_class */
text urilocation[1]; /* array of URI strings */
text execlocation[1]; /* array of ON locations */
char fmttype; /* 't' (text) or 'c' (csv) */
text fmtopts; /* the data format options */
text options[1]; /* the array of external table options */
text command; /* the command string to EXECUTE */
int32 rejectlimit; /* error count reject limit per segment */
char rejectlimittype; /* 'r' (rows) or 'p' (percent) */
bool logerrors; /* 't' to log errors into file */
int32 encoding; /* character encoding of this external table */
bool writable; /* 't' if writable, 'f' if readable */
} FormData_pg_exttable;