天天看點

幹貨丨DolphinDB C++ API 資料寫入使用指南

作者:DolphinDB

本文為 DolphinDB C++ API (連接配接器)寫入接口的使用指南,使用者在有資料寫入需求時,可以根據本篇教程快速明确地選擇寫入方式。本文将從使用場景介紹、原理簡述、函數使用、場景實踐四部分進行具體闡述。

一、場景介紹

目前大資料技術已廣泛應用到金融、物聯網等行業,而海量資料的寫入是大資料處理和分析的基礎。在實際應用中,資料的産生方式和采集途徑多種多樣,DolphinDB 作為輕量級的大資料平台,提供多種資料寫入方式,使用者可以在不同應用場景下選擇最合适的方式進行資料寫入。

以工業物聯網場景為例,裝置資料的寫入場景通常可分為兩類。

(1)多裝置資料分散寫入。

如某廠區有100台裝置,每台裝置通過獨立的傳輸鍊路将資料一條條發送到 API 端,再統一通過 API 端寫入到 DolphinDB。

幹貨丨DolphinDB C++ API 資料寫入使用指南

多裝置資料分散寫入

(2)裝置資料彙總後再寫入 API。 如某廠區有100台裝置,用采集服務(如 Kafka)将裝置資料進行彙總,再統一通過 API 寫入 DolphinDB。

幹貨丨DolphinDB C++ API 資料寫入使用指南

裝置資料彙總寫入

針對以上場景,DolphinDB C++ API 提供了多種寫入方法,以實作不同來源資料的高效寫入:

裝置場景 寫入DolphinDB表的類型 調用DolphinDB 函數 實作方式
多裝置資料一條條分散寫入 ALL MTW(MultithreadedTableWriter) 緩沖行資料後并行寫入
裝置資料彙總寫入 ALL MTW(MultithreadedTableWriter) 将合并的行資料并行寫入
裝置資料彙總寫入 記憶體表 tableInsert 單表寫入
裝置資料彙總寫入 分布式表 PTA(PartitionedTableAppender) 多線程按列批量寫入
裝置資料彙總寫入 分布式表 AFTA(AutoFitTableAppender) 單線程按列批量寫入

MTW 的寫入方式可以适配多種寫入場景,推薦首次接觸 DolphinDB 的使用者使用 MTW 方法;

tableInsert 方法可以将彙總資料簡單快速地寫入記憶體表;

而對于分布式表,C++ API 提供了保障并行寫入的 PTA 方法,以及更簡單易用,能自動轉換寫入資料字段類型的 AFTA/AFTU 方法。

二、原理簡述

傳統的開發人員通常對關系型資料庫的行式存儲(Row-Based)比較熟悉,資料按單行或多行的方式送出并寫入,這種寫入方式很容易了解,但是基于行式存儲的資料庫實際上并不是為大資料處理而設計的,海量資料的寫入很容易遇到性能瓶頸。

DolphinDB 采用列式存儲(Column-Based),在記憶體中維護一個 Cache Engine,當資料寫入檔案時,并不是直接寫入到磁盤,而是先寫入作業系統的緩沖頁面中,再批量寫入磁盤。為了確定寫入資料不會在記憶體中丢失, DolphinDB 使用 WAL(Write Ahead Logging)的機制。詳情可參考 DolphinDB 使用者手冊的資料模型。

以一個5列(字段)的資料表為例,寫入100萬行的資料時,行式存儲按行方式送出并寫入,需要執行100萬次的檔案寫入操作;而列式存儲對單列進行寫入,可以按列一次性送出100萬個值,最少僅需5次檔案操作就能完成資料寫入。兩種寫入方式在海量資料的處理方面性能差異巨大。

幹貨丨DolphinDB C++ API 資料寫入使用指南

行式存儲和列式存儲對比(圖檔來源于網絡)

通常我們在為大資料應用場景規劃寫入方式前,需要了解以上列式存儲的寫入方式。按列批量寫入能最大化發揮列式存儲的優勢,而當實際場景下多個裝置寫入資料較為分散時,可以選擇有資料緩沖的 API 方法如 MTW,以獲得最佳寫入性能。

DolphinDB C++ API 支援多種資料寫入方法,涵蓋多樣化的寫入場景需求,主要特點如下:

寫入方式 特點
MTW

-官方推薦用法

-按行接收資料

-内置資料緩沖隊列

-多線程異步并發寫入

tableInsert

- 友善簡單,速度快

- 事務機制下,同一分區不能同時寫入兩條資料,是以不建議寫入分布式表

PTA

- 按表寫入

- 内置連接配接池

- 自動按分區同步并行寫入

AFTA

- 自動轉換字段類型寫入

- 适用于曆史資料整表落盤,追加寫入

- 單線程同步寫入

AFTU - AFTA 更新寫的版本
BatchTableWriter(舊版本)

- 因相容性而保留的舊版函數

- 實時資料落盤,資料按行寫入

- 單線程同步寫入

  • MTW 支援高效按行寫入,通過内置資料緩沖隊列,MTW 将資料統一發送到 DolphinDB ,可以保證單條資料的寫入效率,适用于多裝置一條條分散寫入場景。當性能要求不高時,也可用于第三方平台彙總資料後批量寫入 API 的場景。 MTW 是對 BatchTableWriter(舊版本)的更新,二者均支援資料分散地從第三方平台傳輸到用戶端的場景。MTW 的預設功能和 BatchTableWriter 一緻,但支援多線程的并發寫入。目前 BatchTableWriter 方式已經完全被 MTW 替代,僅因為相容性而保留。
  • tableInsert 使用簡單高效,可以支援資料彙總寫入場景,若寫入的 DolphinDB 表為記憶體表,可以選擇 tableInsert 或者 PTA;但 tableInsert 沒有分區寫入保障機制,在開啟事務機制的情況下,不建議寫入分布式表。
  • PTA 能夠自動按分區實作同步并行寫入,适用于資料彙總寫入場景。按列并行寫入的機制確定了 PTA 方式在批量寫入場景下擁有性能優勢。
  • AFTA 能夠自動将 C++ 字段類型轉換為 DolphinDB 字段類型完成寫入,使用上較 PTA 更為簡單,同樣适合資料彙總寫入場景。PTA 的寫入速度要好于 AFTA,在對寫入效率有要求且僅進行追加寫的情況下,建議優先考慮 PTA。
  • AFTU 是 AFTA 的更新寫版本,更适合于重複資料存在的場景,讀取新資料不存在重複時直接插入,存在重複時更新。針對資料寫入是否需要更新,即當寫入的資料在資料庫中已有相同的主鍵或者相同的指定字段時,選擇更新該條舊資料或者直接插入新資料,C++ API 給出了不同的寫入方式。其中,MTW 内部分别實作了更新寫和追加寫,以 mode 參數的形式提供選擇;而 PTA 僅提供了追加寫的方式。

MTW,PTA,AFTA,AFTU 四種方法涵蓋了絕大多數寫入場景,其底層實作均調用了 tableInsert 或 upsert! (DolphinDB 腳本函數,關于 tableInsert 的更多介紹請參考 https://www.dolphindb.cn/cn/help/FunctionsandCommands/FunctionReferences/t/tableInsert.html)。下節将重點介紹 MTW,PTA,AFTA,AFTU 四種函數的使用。

DolphinDB C++ API 的具體安裝教程可參考 https://gitee.com/dolphindb/api-cplusplus/blob/release200/README_CN.md

三、函數使用

1. MultithreadedTableWriter(MTW)

MTW 可以向記憶體表、流表、分區表、次元表中寫入資料。不僅在内部實作并發寫入,還可在 API 端建立多個 MTW 對象并發執行寫入任務,MTW 也支援整型、時間等類型的内部自動轉換。

MTW 在 API 端維護一個資料緩沖隊列,API 端可調用寫線程将資料按條持續寫入緩沖隊列,資料在緩沖隊列堆積到一定數量後将一并被傳送到伺服器端。用戶端建立出使用者指定數目的 DolphinDB 連接配接,然後按照分區配置設定寫入資料。在事務機制下,DolphinDB 不允許多個線程同時向同一個分區寫入資料。

首先建立一個 MTW 對象。建立 MTW 時可指定每列的壓縮方式。代碼如下:

vector<COMPRESS_METHOD> compress;
for(int i=0;i<102;i++)compress.push_back(COMPRESS_LZ4);   // 每列的壓縮方式
MultithreadedTableWriter writer(
      "127.0.0.1", 9900, "admin","123456","dfs://test_MultithreadedTableWriter","collect",NULL,false,NULL,1000,1,10,"deviceid", &compress);              

MTW 的構造函數參數詳見 README_CN.md · dolphindb/api-cplusplus - Gitee 。另外,dbName,partitionColumnName,threadCount 三個參數在寫入不同類型的表時有很大差別,具體見下表。

表類型 參數1:dbName 參數2:partitionColumnName 參數3:threadCount
記憶體表 “” 任意字段 >=1
流表 “” 任意字段 >=1
分區表 實際資料庫名 某個分區字段 >=1
次元表 實際資料庫名 “” 1

接着在 API 端建立子線程插入資料到緩沖隊列。當需要寫入的資料量較大時,可根據實際情況在 API 端使用更多的線程。insert 方法需傳入一個 ErrorCodeInfo 對象和一串變長參數,每個變長參數都代表一個字段值。

具體代碼如下:

int rows = 1000; //行數
int cols = 5;   //列數
vector<ConstantSP> datas;
TableSP bt = conn.run("t0 = loadText('"+DATA_FIRE+"');t0");// 模拟資料源從 csv 檔案導入
for(int i = 0; i< rows; ++i){
    for(int j = 0; j < cols; ++j)
        datas.emplace_back(bt->getColumn(j)->get(i));
}
// 建立線程
thread t([&]() {
    try {
        for(int i=0;i < bt->rows();i++){
           ErrorCodeInfo pErrorInfo;
           writer.insert(pErrorInfo,
                      datas[0], datas[1], datas[2], datas[3], datas[4] // 含5個字段的資料
           );
        }
    }catch (exception &e) {
         cerr << "MTW exit with exception: " << e.what() << endl;
    }
});
// 等待插入線程結束
t.join();           

在 MTW 運作時,可能會發生寫入錯誤,使用下述代碼,擷取對象目前的運作狀态。

MultithreadedTableWriter::Status status;
writer.getStatus(status);
if (status.hasError()) {
	cout << "error in writing: " << status.errorInfo << endl;
}           

status 對象的屬性和方法詳見 README_CN.md · dolphindb/api-cplusplus - Gitee 。

注意,API 端的寫入線程結束不代表 MTW 完全退出,需使用 waitForThreadCompletion 方法等待 MTW 完全退出。同時,MTW 需要在記憶體中緩存寫入資料,當 API 異常退出時可能會造成已緩存資料的丢失,是以需要合理配置緩存寫入數量,以适配性能和高可用場景需求。

當 MTW 寫入出現錯誤,需調用 getUnwrittenData 方法擷取未寫入資料。若有未寫入資料,則需再次建立 MTW 對象進行寫入。代碼如下:

writer.getStatus(status);
    if (status.hasError()) {
        cout << "error after write complete: " << status.errorInfo << endl;
        // 擷取未寫入的資料
        std::vector<std::vector<ConstantSP>*> unwrittenData;
        writer.getUnwrittenData(unwrittenData);
        cout << "unwriterdata length " << unwrittenData.size() << endl;
        if (!unwrittenData.empty()) {
            try {
                // 重新寫入這些資料,原有的 MTW 因為異常退出已經不能用了,需要建立新的 MTW
                cout << "create new MTW and write again." << endl;
                MultithreadedTableWriter newWriter("183.136.170.167", 9900, "admin", "123456", "dfs://test_MultithreadedTableWriter", "collect", NULL,false,NULL,1000,1,5,"deviceid", &compress);
                ErrorCodeInfo errorInfo;
                // 插入未寫入的資料
                if (newWriter.insertUnwrittenData(unwrittenData, errorInfo)) {
                    // 等待寫入完成後檢查狀态
                    newWriter.waitForThreadCompletion();
                    newWriter.getStatus(status);
                    if (status.hasError()) {
                        cout << "error in write again: " << status.errorInfo << endl;
                    }
                }
                else {
                    cout << "error in write again: " << errorInfo.errorInfo << endl;
                }
            }
            catch (exception &e) {
                cerr << "new MTW exit with exception: " << e.what() << endl;
            }
        }
    }           

至此,MTW 的使用流程介紹完畢。

2. PartitionedTableAppender(PTA)

PTA 設計一個連接配接池,擷取分布式表的分區資訊後,将分區配置設定給連接配接池來并行寫入。

PartitionedTableAppender 可向分布式表中寫入資料

PTA 的使用簡潔友善,建立一個連接配接池對象和 PTA 對象。注意,對象建立時需要指定寫入表的分區字段,盡量使分區個數與連接配接池中連接配接個數相同。因為線程多反而會增加線程建立和銷毀開銷,而線程少無法最大利用伺服器資源。當每個分區都能同時進行寫入并且沒有多餘的線程建立,PTA 寫入效率最高,資源配置設定也最合理。代碼如下:

DBConnectionPool pool("127.0.0.1", 9900, 5, "admin", "123456");
// 分區列傳入 deviceid 或 ts 均可,保證可以使用多線程寫入資料集多個分區,因為 DolphinDB 開啟事務時不允許多個 writer 同時寫入到一個分區内
PartitionedTableAppender appender("dfs://test_PartitionedTableAppender", "collect","deviceid", pool);  
appender.append(bt);
pool.shutDown();            

若目前連接配接池不再使用,會自動被釋放,但存在釋放延時,可以通過調用 shutDown() 等待線程任務執行結束後立即釋放連接配接。

3. AutoFitTableAppender(AFTA)

AFTA 建立與 server 的連接配接後,對列數、列字段名、列字段類型等基礎資訊進行判斷,完成整型、時間類型等字段的自動轉換,随即使用 tableInsert 進行寫入,目前 AFTA 與 AFTU 尚不支援整型與浮點型資料間的轉換。

AutoFitTableAppender 内部實作簡單,實用性高。可向流表、記憶體表、分區表、磁盤表寫入資料

AFTA 的使用較 PTA 更為簡單,建立完 AFTA 對象後即可調用 append() 寫入。具體代碼如下:

AutoFitTableAppender appender("dfs://test_AutoFitTableAppender", "collect", conn);
appender.append(bt);           

4. AutoFitTableUpsert(AFTU)

建立 AFTU 時可指定字段 keycolName,當新插入資料的指定字段不與資料庫中已有資料重複時,AFTU 直接将資料插入,而當該字段出現重複時,AFTU 可以對該條資料進行更新。

AutoFitTableUpsert 更适合于有重複資料寫入的場景

類似于 AFTA,通過建立的 AFTU,調用 upsert() 即可完成資料的寫入和更新:

vector<string> keycolName = {"id"};
AutoFitTableUpsert aftu("dfs://test_AutoFitTableUpsert", "collect", conn, false, &keycolName);
aftu.upsert(bt);           

以下就追加寫入和更新寫入場景提供了更全面的選擇參考:

資料寫入場景 字段類型自動比對 追加寫入 更新寫入
多裝置分散寫入 MTW MTW
資料彙總寫入 PTA、MTW MTW
資料彙總寫入 AFTA、MTW AFTU、MTW

在底層實作上,AFTA 和 AFTU 通過整表插入的方式實作批量資料寫入;而 MTW 通過維護資料緩沖隊列實作批量資料異步寫入。若需要使用 C++ API 實作整表資料的寫入,推薦使用 AFTA 或 AFTU。

四、場景實踐

以下場景案例展示了使用 DolphinDB C++ API 實作資料寫入的流程:

某裝置實驗平台有100台裝置,單台裝置有1000個測點,實驗平台需要采集裝置的測點資訊進而評估裝置的使用情況。

實驗平台要求測點資訊按單值模型存儲,每台裝置每隔5分鐘對所有1000個測點進行資料采集,彙總所有裝置的資料後通過消息中間件統一傳輸到 API 端。實驗平台要求支援對資料的批量寫入,同時保證資料類型的一緻性,不需要資料類型自動轉換;若用戶端意外崩潰,重新開機後 API 可重新接受資料。這種場景下采用 MTW 方法将實時資料寫入資料庫,其流程圖如下:

幹貨丨DolphinDB C++ API 資料寫入使用指南

實時資料落盤流程圖

資料集:

  • 記錄描述:100台裝置,每台1000個測點,采集頻率5分鐘1次,采集持續10天
  • 記錄行數:2.6億行
  • 磁盤占用:1116 MB
  • 字段數量:6
  • 字段樣式:
    • ts:數采時間
    • deviceCode:裝置編号
    • logicalPostionId:邏輯位置ID
    • physicalPostionId:實體位置ID
    • propertyCode:屬性測點編碼
    • propertyValue:測點值(累計産量)

準備工作:

首先要在 server 端建立分布式資料庫 db_demo、分區表 collect:

// 建立分布式資料庫及分區表
dbname="dfs://db_demo"
tablename="collect"
cols_info=`ts`deviceCdoe`logicalPostionId`physicalPostionId`propertyCode`propertyValue
cols_type=[DATETIME,SYMBOL,SYMBOL,SYMBOL,SYMBOL,INT]
t=table(1:0,cols_info,cols_type)
db=database(dbname,VALUE,[2022.11.01],engine=`TSDB)
pt=createPartitionedTable(db,t,tablename,`ts,,`deviceCdoe`ts)           

然後建立一張流資料表 streamtable,使用 MTW 方式将資料寫入這張流表,然後訂閱流表,資料将從流表流向分區表 collect:

// 建立流表
def saveToDFS(mutable dfstable, msg): dfstable.append!(msg)
share streamTable(1:0, cols_info, cols_type) as streamtable;
subscribeTable(tableName="streamtable", actionName="savetodfs", offset=0, handler=saveToDFS{pt}, msgAsTable=true, batchSize=1000, throttle=1)           

也可直接在 C++ 代碼中使用 conn.run(script) 的方式運作此段代碼。

接口調用:

建立一個 MTW 對象,訂閱流表

// 建立writer對象
MultithreadedTableWriter writer(
            "183.136.170.167", 9900, "admin","123456","","streamtable",NULL,false,NULL,1000,1,5,"deviceid", &compress);  
MultithreadedTableWriter::Status status;  // 儲存 writer 狀态           

這裡需要說明的是,本文着重介紹 API 的寫入,通過模拟來展示從第三方平台采集資料到 API 端寫入這一過程。此外,本場景在 API 端使用單線程寫入資料,使用者可根據實際場景使用多線程提高 API 端寫入效率,完整代碼見附件 API_mtw.cpp。

// 模拟接受批量資料,建立單線程寫入資料
// bt 模拟接收消息中間件發送的資料,按裝置(每台裝置1000條資料)周遊采集資料
for(int i=0;i < (bt->rows())/1000;i++){
	system_clock::duration begin = system_clock::now().time_since_epoch();
	milliseconds milbegin = duration_cast<milliseconds>(begin);
	// 每台資料共1000個測點,寫入1000行
	for(int j=i*1000;j<(i+1)*1000;j++){
		ErrorCodeInfo pErrorInfo;
		// 模拟對單條資料6個字段的寫入
		writer.insert(pErrorInfo,
			datas[i*6+0], datas[i*6+1], datas[i*6+2], datas[i*6+3], datas[i*6+4], datas[i*6+5]
		)
	}
	system_clock::duration end = system_clock::now().time_since_epoch();
	milliseconds milend = duration_cast<milliseconds>(end);
	if((milend.count()-milbegin.count())<5000){
		// 控制模拟寫入的頻率
		sleep_for(std::chrono::milliseconds(5000-(milend.count()-milbegin.count())));
	}
}           

若背景線程發生錯誤,MTW 可能退出後未将資料全部寫入伺服器(包括導緻背景線程錯誤的那一批資料,這批資料可能已經寫入伺服器也可能未寫入伺服器)

// 檢查寫入完成後 MTW 狀态
writer.getStatus(status);           

該情況下首先擷取未完成寫入的資料

// 擷取未寫入的資料
std::vector<std::vector<ConstantSP>*> unwrittenData;
writer.getUnwrittenData(unwrittenData);
cout << "Unwritten data length " << unwrittenData.size() << endl;           

重新寫入上述資料

// 重新寫入這些資料,原有的 MTW 因為異常退出已經不能用了,需要建立新的 MTW
MultithreadedTableWriter newWriter("192.168.0.61", 8848, "admin", "123456", "dfs://test_MultithreadedTableWriter", "collect", NULL,false,NULL,10000,1,10,"deviceid", &compress);
ErrorCodeInfo errorInfo;
// 插入擷取到的未寫入資料    
if (newWriter.insertUnwrittenData(unwrittenData, errorInfo)) {
	// 等待寫入完成後檢查狀态
	newWriter.waitForThreadCompletion();
	newWriter.getStatus(status);
	if (status.hasError()) {
		cout << "error in write again: " << status.errorInfo << endl;
	}
}
else {
	cout << "error in write again: " << errorInfo.errorInfo << endl;
}           

附件:

https://gitee.com/dolphindb/Tutorials_CN/tree/master/script/ddb_cpp_api_connector

[狗頭]文内完整連結,請前往知乎@DolphinDB 檢視學習~

繼續閱讀