本文為 DolphinDB C++ API (連接配接器)寫入接口的使用指南,使用者在有資料寫入需求時,可以根據本篇教程快速明确地選擇寫入方式。本文将從使用場景介紹、原理簡述、函數使用、場景實踐四部分進行具體闡述。
一、場景介紹
目前大資料技術已廣泛應用到金融、物聯網等行業,而海量資料的寫入是大資料處理和分析的基礎。在實際應用中,資料的産生方式和采集途徑多種多樣,DolphinDB 作為輕量級的大資料平台,提供多種資料寫入方式,使用者可以在不同應用場景下選擇最合适的方式進行資料寫入。
以工業物聯網場景為例,裝置資料的寫入場景通常可分為兩類。
(1)多裝置資料分散寫入。
如某廠區有100台裝置,每台裝置通過獨立的傳輸鍊路将資料一條條發送到 API 端,再統一通過 API 端寫入到 DolphinDB。
多裝置資料分散寫入
(2)裝置資料彙總後再寫入 API。 如某廠區有100台裝置,用采集服務(如 Kafka)将裝置資料進行彙總,再統一通過 API 寫入 DolphinDB。
裝置資料彙總寫入
針對以上場景,DolphinDB C++ API 提供了多種寫入方法,以實作不同來源資料的高效寫入:
裝置場景 | 寫入DolphinDB表的類型 | 調用DolphinDB 函數 | 實作方式 |
多裝置資料一條條分散寫入 | ALL | MTW(MultithreadedTableWriter) | 緩沖行資料後并行寫入 |
裝置資料彙總寫入 | ALL | MTW(MultithreadedTableWriter) | 将合并的行資料并行寫入 |
裝置資料彙總寫入 | 記憶體表 | tableInsert | 單表寫入 |
裝置資料彙總寫入 | 分布式表 | PTA(PartitionedTableAppender) | 多線程按列批量寫入 |
裝置資料彙總寫入 | 分布式表 | AFTA(AutoFitTableAppender) | 單線程按列批量寫入 |
MTW 的寫入方式可以适配多種寫入場景,推薦首次接觸 DolphinDB 的使用者使用 MTW 方法;
tableInsert 方法可以将彙總資料簡單快速地寫入記憶體表;
而對于分布式表,C++ API 提供了保障并行寫入的 PTA 方法,以及更簡單易用,能自動轉換寫入資料字段類型的 AFTA/AFTU 方法。
二、原理簡述
傳統的開發人員通常對關系型資料庫的行式存儲(Row-Based)比較熟悉,資料按單行或多行的方式送出并寫入,這種寫入方式很容易了解,但是基于行式存儲的資料庫實際上并不是為大資料處理而設計的,海量資料的寫入很容易遇到性能瓶頸。
DolphinDB 采用列式存儲(Column-Based),在記憶體中維護一個 Cache Engine,當資料寫入檔案時,并不是直接寫入到磁盤,而是先寫入作業系統的緩沖頁面中,再批量寫入磁盤。為了確定寫入資料不會在記憶體中丢失, DolphinDB 使用 WAL(Write Ahead Logging)的機制。詳情可參考 DolphinDB 使用者手冊的資料模型。
以一個5列(字段)的資料表為例,寫入100萬行的資料時,行式存儲按行方式送出并寫入,需要執行100萬次的檔案寫入操作;而列式存儲對單列進行寫入,可以按列一次性送出100萬個值,最少僅需5次檔案操作就能完成資料寫入。兩種寫入方式在海量資料的處理方面性能差異巨大。
行式存儲和列式存儲對比(圖檔來源于網絡)
通常我們在為大資料應用場景規劃寫入方式前,需要了解以上列式存儲的寫入方式。按列批量寫入能最大化發揮列式存儲的優勢,而當實際場景下多個裝置寫入資料較為分散時,可以選擇有資料緩沖的 API 方法如 MTW,以獲得最佳寫入性能。
DolphinDB C++ API 支援多種資料寫入方法,涵蓋多樣化的寫入場景需求,主要特點如下:
寫入方式 | 特點 |
MTW | -官方推薦用法 -按行接收資料 -内置資料緩沖隊列 -多線程異步并發寫入 |
tableInsert | - 友善簡單,速度快 - 事務機制下,同一分區不能同時寫入兩條資料,是以不建議寫入分布式表 |
PTA | - 按表寫入 - 内置連接配接池 - 自動按分區同步并行寫入 |
AFTA | - 自動轉換字段類型寫入 - 适用于曆史資料整表落盤,追加寫入 - 單線程同步寫入 |
AFTU | - AFTA 更新寫的版本 |
BatchTableWriter(舊版本) | - 因相容性而保留的舊版函數 - 實時資料落盤,資料按行寫入 - 單線程同步寫入 |
- MTW 支援高效按行寫入,通過内置資料緩沖隊列,MTW 将資料統一發送到 DolphinDB ,可以保證單條資料的寫入效率,适用于多裝置一條條分散寫入場景。當性能要求不高時,也可用于第三方平台彙總資料後批量寫入 API 的場景。 MTW 是對 BatchTableWriter(舊版本)的更新,二者均支援資料分散地從第三方平台傳輸到用戶端的場景。MTW 的預設功能和 BatchTableWriter 一緻,但支援多線程的并發寫入。目前 BatchTableWriter 方式已經完全被 MTW 替代,僅因為相容性而保留。
- tableInsert 使用簡單高效,可以支援資料彙總寫入場景,若寫入的 DolphinDB 表為記憶體表,可以選擇 tableInsert 或者 PTA;但 tableInsert 沒有分區寫入保障機制,在開啟事務機制的情況下,不建議寫入分布式表。
- PTA 能夠自動按分區實作同步并行寫入,适用于資料彙總寫入場景。按列并行寫入的機制確定了 PTA 方式在批量寫入場景下擁有性能優勢。
- AFTA 能夠自動将 C++ 字段類型轉換為 DolphinDB 字段類型完成寫入,使用上較 PTA 更為簡單,同樣适合資料彙總寫入場景。PTA 的寫入速度要好于 AFTA,在對寫入效率有要求且僅進行追加寫的情況下,建議優先考慮 PTA。
- AFTU 是 AFTA 的更新寫版本,更适合于重複資料存在的場景,讀取新資料不存在重複時直接插入,存在重複時更新。針對資料寫入是否需要更新,即當寫入的資料在資料庫中已有相同的主鍵或者相同的指定字段時,選擇更新該條舊資料或者直接插入新資料,C++ API 給出了不同的寫入方式。其中,MTW 内部分别實作了更新寫和追加寫,以 mode 參數的形式提供選擇;而 PTA 僅提供了追加寫的方式。
MTW,PTA,AFTA,AFTU 四種方法涵蓋了絕大多數寫入場景,其底層實作均調用了 tableInsert 或 upsert! (DolphinDB 腳本函數,關于 tableInsert 的更多介紹請參考 https://www.dolphindb.cn/cn/help/FunctionsandCommands/FunctionReferences/t/tableInsert.html)。下節将重點介紹 MTW,PTA,AFTA,AFTU 四種函數的使用。
DolphinDB C++ API 的具體安裝教程可參考 https://gitee.com/dolphindb/api-cplusplus/blob/release200/README_CN.md
三、函數使用
1. MultithreadedTableWriter(MTW)
MTW 可以向記憶體表、流表、分區表、次元表中寫入資料。不僅在内部實作并發寫入,還可在 API 端建立多個 MTW 對象并發執行寫入任務,MTW 也支援整型、時間等類型的内部自動轉換。
MTW 在 API 端維護一個資料緩沖隊列,API 端可調用寫線程将資料按條持續寫入緩沖隊列,資料在緩沖隊列堆積到一定數量後将一并被傳送到伺服器端。用戶端建立出使用者指定數目的 DolphinDB 連接配接,然後按照分區配置設定寫入資料。在事務機制下,DolphinDB 不允許多個線程同時向同一個分區寫入資料。
首先建立一個 MTW 對象。建立 MTW 時可指定每列的壓縮方式。代碼如下:
vector<COMPRESS_METHOD> compress;
for(int i=0;i<102;i++)compress.push_back(COMPRESS_LZ4); // 每列的壓縮方式
MultithreadedTableWriter writer(
"127.0.0.1", 9900, "admin","123456","dfs://test_MultithreadedTableWriter","collect",NULL,false,NULL,1000,1,10,"deviceid", &compress);
MTW 的構造函數參數詳見 README_CN.md · dolphindb/api-cplusplus - Gitee 。另外,dbName,partitionColumnName,threadCount 三個參數在寫入不同類型的表時有很大差別,具體見下表。
表類型 | 參數1:dbName | 參數2:partitionColumnName | 參數3:threadCount |
記憶體表 | “” | 任意字段 | >=1 |
流表 | “” | 任意字段 | >=1 |
分區表 | 實際資料庫名 | 某個分區字段 | >=1 |
次元表 | 實際資料庫名 | “” | 1 |
接着在 API 端建立子線程插入資料到緩沖隊列。當需要寫入的資料量較大時,可根據實際情況在 API 端使用更多的線程。insert 方法需傳入一個 ErrorCodeInfo 對象和一串變長參數,每個變長參數都代表一個字段值。
具體代碼如下:
int rows = 1000; //行數
int cols = 5; //列數
vector<ConstantSP> datas;
TableSP bt = conn.run("t0 = loadText('"+DATA_FIRE+"');t0");// 模拟資料源從 csv 檔案導入
for(int i = 0; i< rows; ++i){
for(int j = 0; j < cols; ++j)
datas.emplace_back(bt->getColumn(j)->get(i));
}
// 建立線程
thread t([&]() {
try {
for(int i=0;i < bt->rows();i++){
ErrorCodeInfo pErrorInfo;
writer.insert(pErrorInfo,
datas[0], datas[1], datas[2], datas[3], datas[4] // 含5個字段的資料
);
}
}catch (exception &e) {
cerr << "MTW exit with exception: " << e.what() << endl;
}
});
// 等待插入線程結束
t.join();
在 MTW 運作時,可能會發生寫入錯誤,使用下述代碼,擷取對象目前的運作狀态。
MultithreadedTableWriter::Status status;
writer.getStatus(status);
if (status.hasError()) {
cout << "error in writing: " << status.errorInfo << endl;
}
status 對象的屬性和方法詳見 README_CN.md · dolphindb/api-cplusplus - Gitee 。
注意,API 端的寫入線程結束不代表 MTW 完全退出,需使用 waitForThreadCompletion 方法等待 MTW 完全退出。同時,MTW 需要在記憶體中緩存寫入資料,當 API 異常退出時可能會造成已緩存資料的丢失,是以需要合理配置緩存寫入數量,以适配性能和高可用場景需求。
當 MTW 寫入出現錯誤,需調用 getUnwrittenData 方法擷取未寫入資料。若有未寫入資料,則需再次建立 MTW 對象進行寫入。代碼如下:
writer.getStatus(status);
if (status.hasError()) {
cout << "error after write complete: " << status.errorInfo << endl;
// 擷取未寫入的資料
std::vector<std::vector<ConstantSP>*> unwrittenData;
writer.getUnwrittenData(unwrittenData);
cout << "unwriterdata length " << unwrittenData.size() << endl;
if (!unwrittenData.empty()) {
try {
// 重新寫入這些資料,原有的 MTW 因為異常退出已經不能用了,需要建立新的 MTW
cout << "create new MTW and write again." << endl;
MultithreadedTableWriter newWriter("183.136.170.167", 9900, "admin", "123456", "dfs://test_MultithreadedTableWriter", "collect", NULL,false,NULL,1000,1,5,"deviceid", &compress);
ErrorCodeInfo errorInfo;
// 插入未寫入的資料
if (newWriter.insertUnwrittenData(unwrittenData, errorInfo)) {
// 等待寫入完成後檢查狀态
newWriter.waitForThreadCompletion();
newWriter.getStatus(status);
if (status.hasError()) {
cout << "error in write again: " << status.errorInfo << endl;
}
}
else {
cout << "error in write again: " << errorInfo.errorInfo << endl;
}
}
catch (exception &e) {
cerr << "new MTW exit with exception: " << e.what() << endl;
}
}
}
至此,MTW 的使用流程介紹完畢。
2. PartitionedTableAppender(PTA)
PTA 設計一個連接配接池,擷取分布式表的分區資訊後,将分區配置設定給連接配接池來并行寫入。
PartitionedTableAppender 可向分布式表中寫入資料
PTA 的使用簡潔友善,建立一個連接配接池對象和 PTA 對象。注意,對象建立時需要指定寫入表的分區字段,盡量使分區個數與連接配接池中連接配接個數相同。因為線程多反而會增加線程建立和銷毀開銷,而線程少無法最大利用伺服器資源。當每個分區都能同時進行寫入并且沒有多餘的線程建立,PTA 寫入效率最高,資源配置設定也最合理。代碼如下:
DBConnectionPool pool("127.0.0.1", 9900, 5, "admin", "123456");
// 分區列傳入 deviceid 或 ts 均可,保證可以使用多線程寫入資料集多個分區,因為 DolphinDB 開啟事務時不允許多個 writer 同時寫入到一個分區内
PartitionedTableAppender appender("dfs://test_PartitionedTableAppender", "collect","deviceid", pool);
appender.append(bt);
pool.shutDown();
若目前連接配接池不再使用,會自動被釋放,但存在釋放延時,可以通過調用 shutDown() 等待線程任務執行結束後立即釋放連接配接。
3. AutoFitTableAppender(AFTA)
AFTA 建立與 server 的連接配接後,對列數、列字段名、列字段類型等基礎資訊進行判斷,完成整型、時間類型等字段的自動轉換,随即使用 tableInsert 進行寫入,目前 AFTA 與 AFTU 尚不支援整型與浮點型資料間的轉換。
AutoFitTableAppender 内部實作簡單,實用性高。可向流表、記憶體表、分區表、磁盤表寫入資料
AFTA 的使用較 PTA 更為簡單,建立完 AFTA 對象後即可調用 append() 寫入。具體代碼如下:
AutoFitTableAppender appender("dfs://test_AutoFitTableAppender", "collect", conn);
appender.append(bt);
4. AutoFitTableUpsert(AFTU)
建立 AFTU 時可指定字段 keycolName,當新插入資料的指定字段不與資料庫中已有資料重複時,AFTU 直接将資料插入,而當該字段出現重複時,AFTU 可以對該條資料進行更新。
AutoFitTableUpsert 更适合于有重複資料寫入的場景
類似于 AFTA,通過建立的 AFTU,調用 upsert() 即可完成資料的寫入和更新:
vector<string> keycolName = {"id"};
AutoFitTableUpsert aftu("dfs://test_AutoFitTableUpsert", "collect", conn, false, &keycolName);
aftu.upsert(bt);
以下就追加寫入和更新寫入場景提供了更全面的選擇參考:
資料寫入場景 | 字段類型自動比對 | 追加寫入 | 更新寫入 |
多裝置分散寫入 | 否 | MTW | MTW |
資料彙總寫入 | 否 | PTA、MTW | MTW |
資料彙總寫入 | 是 | AFTA、MTW | AFTU、MTW |
在底層實作上,AFTA 和 AFTU 通過整表插入的方式實作批量資料寫入;而 MTW 通過維護資料緩沖隊列實作批量資料異步寫入。若需要使用 C++ API 實作整表資料的寫入,推薦使用 AFTA 或 AFTU。
四、場景實踐
以下場景案例展示了使用 DolphinDB C++ API 實作資料寫入的流程:
某裝置實驗平台有100台裝置,單台裝置有1000個測點,實驗平台需要采集裝置的測點資訊進而評估裝置的使用情況。
實驗平台要求測點資訊按單值模型存儲,每台裝置每隔5分鐘對所有1000個測點進行資料采集,彙總所有裝置的資料後通過消息中間件統一傳輸到 API 端。實驗平台要求支援對資料的批量寫入,同時保證資料類型的一緻性,不需要資料類型自動轉換;若用戶端意外崩潰,重新開機後 API 可重新接受資料。這種場景下采用 MTW 方法将實時資料寫入資料庫,其流程圖如下:
實時資料落盤流程圖
資料集:
- 記錄描述:100台裝置,每台1000個測點,采集頻率5分鐘1次,采集持續10天
- 記錄行數:2.6億行
- 磁盤占用:1116 MB
- 字段數量:6
- 字段樣式:
- ts:數采時間
- deviceCode:裝置編号
- logicalPostionId:邏輯位置ID
- physicalPostionId:實體位置ID
- propertyCode:屬性測點編碼
- propertyValue:測點值(累計産量)
準備工作:
首先要在 server 端建立分布式資料庫 db_demo、分區表 collect:
// 建立分布式資料庫及分區表
dbname="dfs://db_demo"
tablename="collect"
cols_info=`ts`deviceCdoe`logicalPostionId`physicalPostionId`propertyCode`propertyValue
cols_type=[DATETIME,SYMBOL,SYMBOL,SYMBOL,SYMBOL,INT]
t=table(1:0,cols_info,cols_type)
db=database(dbname,VALUE,[2022.11.01],engine=`TSDB)
pt=createPartitionedTable(db,t,tablename,`ts,,`deviceCdoe`ts)
然後建立一張流資料表 streamtable,使用 MTW 方式将資料寫入這張流表,然後訂閱流表,資料将從流表流向分區表 collect:
// 建立流表
def saveToDFS(mutable dfstable, msg): dfstable.append!(msg)
share streamTable(1:0, cols_info, cols_type) as streamtable;
subscribeTable(tableName="streamtable", actionName="savetodfs", offset=0, handler=saveToDFS{pt}, msgAsTable=true, batchSize=1000, throttle=1)
也可直接在 C++ 代碼中使用 conn.run(script) 的方式運作此段代碼。
接口調用:
建立一個 MTW 對象,訂閱流表
// 建立writer對象
MultithreadedTableWriter writer(
"183.136.170.167", 9900, "admin","123456","","streamtable",NULL,false,NULL,1000,1,5,"deviceid", &compress);
MultithreadedTableWriter::Status status; // 儲存 writer 狀态
這裡需要說明的是,本文着重介紹 API 的寫入,通過模拟來展示從第三方平台采集資料到 API 端寫入這一過程。此外,本場景在 API 端使用單線程寫入資料,使用者可根據實際場景使用多線程提高 API 端寫入效率,完整代碼見附件 API_mtw.cpp。
// 模拟接受批量資料,建立單線程寫入資料
// bt 模拟接收消息中間件發送的資料,按裝置(每台裝置1000條資料)周遊采集資料
for(int i=0;i < (bt->rows())/1000;i++){
system_clock::duration begin = system_clock::now().time_since_epoch();
milliseconds milbegin = duration_cast<milliseconds>(begin);
// 每台資料共1000個測點,寫入1000行
for(int j=i*1000;j<(i+1)*1000;j++){
ErrorCodeInfo pErrorInfo;
// 模拟對單條資料6個字段的寫入
writer.insert(pErrorInfo,
datas[i*6+0], datas[i*6+1], datas[i*6+2], datas[i*6+3], datas[i*6+4], datas[i*6+5]
)
}
system_clock::duration end = system_clock::now().time_since_epoch();
milliseconds milend = duration_cast<milliseconds>(end);
if((milend.count()-milbegin.count())<5000){
// 控制模拟寫入的頻率
sleep_for(std::chrono::milliseconds(5000-(milend.count()-milbegin.count())));
}
}
若背景線程發生錯誤,MTW 可能退出後未将資料全部寫入伺服器(包括導緻背景線程錯誤的那一批資料,這批資料可能已經寫入伺服器也可能未寫入伺服器)
// 檢查寫入完成後 MTW 狀态
writer.getStatus(status);
該情況下首先擷取未完成寫入的資料
// 擷取未寫入的資料
std::vector<std::vector<ConstantSP>*> unwrittenData;
writer.getUnwrittenData(unwrittenData);
cout << "Unwritten data length " << unwrittenData.size() << endl;
重新寫入上述資料
// 重新寫入這些資料,原有的 MTW 因為異常退出已經不能用了,需要建立新的 MTW
MultithreadedTableWriter newWriter("192.168.0.61", 8848, "admin", "123456", "dfs://test_MultithreadedTableWriter", "collect", NULL,false,NULL,10000,1,10,"deviceid", &compress);
ErrorCodeInfo errorInfo;
// 插入擷取到的未寫入資料
if (newWriter.insertUnwrittenData(unwrittenData, errorInfo)) {
// 等待寫入完成後檢查狀态
newWriter.waitForThreadCompletion();
newWriter.getStatus(status);
if (status.hasError()) {
cout << "error in write again: " << status.errorInfo << endl;
}
}
else {
cout << "error in write again: " << errorInfo.errorInfo << endl;
}
附件:
https://gitee.com/dolphindb/Tutorials_CN/tree/master/script/ddb_cpp_api_connector
[狗頭]文内完整連結,請前往知乎@DolphinDB 檢視學習~