摘要:本文所介紹 Nebula Graph 連接配接器 Nebula Flink Connector,采用類似 Flink 提供的 Flink Connector 形式,支援 Flink 讀寫分布式圖資料庫 Nebula Graph。
文章首發 Nebula Graph 官網部落格: https://nebula-graph.com.cn/posts/nebula-flink-connector/

在關系網絡分析、關系模組化、實時推薦等場景中應用圖資料庫作為背景資料支撐已相對普及,且部分應用場景對圖資料的實時性要求較高,如推薦系統、搜尋引擎。為了提升資料的實時性,業界廣泛應用流式計算對更新的資料進行增量實時處理。為了支援對圖資料的流式計算,Nebula Graph 團隊開發了 Nebula Flink Connector,支援利用 Flink 進行 Nebula Graph 圖資料的流式處理和計算。
Flink 是新一代流批統一的計算引擎,它從不同的第三方存儲引擎中讀取資料,并進行處理,再寫入另外的存儲引擎中。Flink Connector 的作用就相當于一個連接配接器,連接配接 Flink 計算引擎跟外界存儲系統。
與外界進行資料交換時,Flink 支援以下 4 種方式:
- Flink 源碼内部預定義 Source 和 Sink 的 API;
- Flink 内部提供了 Bundled Connectors,如 JDBC Connector。
- Apache Bahir 項目中提供連接配接器
Apache Bahir 最初是從 Apache Spark 中獨立出來的項目,以提供不限于 Spark 相關的擴充/插件、連接配接器和其他可插入元件的實作。
- 通過異步 I/O 方式。
流計算中經常需要與外部存儲系統互動,比如需要關聯 MySQL 中的某個表。一般來說,如果用同步 I/O 的方式,會造成系統中出現大的等待時間,影響吞吐和延遲。異步 I/O 則可以并發處理多個請求,提高吞吐,減少延遲。
本文所介紹 Nebula Graph 連接配接器 Nebula Flink Connector,采用類似 Flink 提供的 Flink Connector 形式,支援 Flink 讀寫分布式圖資料庫 Nebula Graph。
一、Connector Source
Flink 作為一款流式計算架構,它可處理有界資料,也可處理無界資料。所謂無界,即源源不斷的資料,不會有終止,實時流處理所處理的資料便是無界資料;批處理的資料,即有界資料。而 Source 便是 Flink 處理資料的資料來源。
Nebula Flink Connector 中的 Source 便是圖資料庫 Nebula Graph。Flink 提供了豐富的 Connector 元件允許使用者自定義資料源來連接配接外部資料存儲系統。
1.1 Source 簡介
Flink 的 Source 主要負責外部資料源的接入,Flink 的 Source 能力主要是通過 read 相關的 API 和 addSource 方法這 2 種方式來實作資料源的讀取,使用 addSource 方法對接外部資料源時,可以使用 Flink Bundled Connector,也可以自定義 Source。
Flink Source 的幾種使用方式如下:
本章主要介紹如何通過自定義 Source 方式實作 Nebula Graph Source。
1.2 自定義 Source
在 Flink 中可以使用
StreamExecutionEnvironment.addSource(sourceFunction)
和
ExecutionEnvironment.createInput(inputFormat)
兩種方式來為你的程式添加資料來源。
Flink 已經提供多個内置的
source functions
,開發者可以通過繼承
RichSourceFunction
來自定義非并行的
source
,通過繼承
RichParallelSourceFunction
來自定義并行的
Source
。
RichSourceFunction
和
RichParallelSourceFunction
是
SourceFunction
RichFunction
特性的結合。 其中
SourceFunction
負責資料的生成,
RichFunction
負責資源的管理。當然,也可以隻實作
SourceFunction
接口來定義最簡單的隻具備擷取資料功能的
dataSource
。
通常自定義一個完善的 Source 節點是通過實作
RichSourceFunction
類來完成的,該類兼具
RichFunction
SourceFunction
的能力,是以自定義 Flink 的 Nebula Graph Source 功能我們需要實作
RichSourceFunction
中提供的方法。
1.3 自定義 Nebula Graph Source 實作原理
Nebula Flink Connector 中實作的自定義 Nebula Graph Source 資料源提供了兩種使用方式,分别是 addSource 和 createInput 方式。
Nebula Graph Source 實作類圖如下:
(1)addSource
該方式是通過 NebulaSourceFunction 類實作的,該類繼承自 RichSourceFunction 并實作了以下方法:
-
open
準備 Nebula Graph 連接配接資訊,并擷取 Nebula Graph Meta 服務和 Storage 服務的連接配接。
-
close
資料讀取完成,釋放資源。關閉 Nebula Graph 服務的連接配接。
-
run
開始讀取資料,并将資料填充到 sourceContext。
-
cancel
取消 Flink 作業時調用,關閉資源。
(2)createInput
該方式是通過 NebulaInputFormat 類實作的,該類繼承自 RichInputFormat 并實作了以下方法:
-
openInputFormat
準備 inputFormat,擷取連接配接。
-
closeInputFormat
資料讀取完成,釋放資源,關閉 Nebula Graph 服務的連接配接。
-
getStatistics
擷取資料源的基本統計資訊。
-
createInputSplits
基于配置的 partition 參數建立 GenericInputSplit。
-
getInputSplitAssigner
傳回輸入的 split 配置設定器,按原始計算的順序傳回 Source 的所有 split。
- 開始 inputFormat 的資料讀取,将讀取的資料轉換 Flink 的資料格式,構造疊代器。
- 資料讀取完成,列印讀取日志。
-
reachedEnd
是否讀取完成
-
nextRecord
通過疊代器擷取下一條資料
通過 addSource 讀取 Source 資料得到的是 Flink 的 DataStreamSource,表示 DataStream 的起點。
通過 createInput 讀取資料得到的是 Flink 的 DataSource,DataSource 是一個建立新資料集的 Operator,這個 Operator 可作為進一步轉換的資料集。DataSource 可以通過 withParameters 封裝配置參數進行其他的操作。
1.4 自定義 Nebula Graph Source 應用實踐
使用 Flink 讀取 Nebula Graph 圖資料時,需要構造 NebulaSourceFunction 和 NebulaOutputFormat,并通過 Flink 的 addSource 或 createInput 方法注冊資料源進行 Nebula Graph 資料讀取。
構造 NebulaSourceFunction 和 NebulaOutputFormat 時需要進行用戶端參數的配置和執行參數的配置,說明如下:
配置項說明:
- NebulaClientOptions
- 配置 address,NebulaSource 需要配置 Nebula Graph Metad 服務的位址。
- 配置 username
- 配置 password
- VertexExecutionOptions
- 配置 GraphSpace
- 配置要讀取的 tag
- 配置要讀取的字段集
- 配置是否讀取所有字段,預設為 false, 若配置為 true 則字段集配置無效
- 配置每次讀取的資料量 limit,預設 2000
- EdgeExecutionOptions
- 配置要讀取的 edge
// 構造 Nebula Graph 用戶端連接配接需要的參數
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions
.NebulaClientOptionsBuilder()
.setAddress("127.0.0.1:45500")
.build();
// 建立 connectionProvider
NebulaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions);
// 構造 Nebula Graph 資料讀取需要的參數
List<String> cols = Arrays.asList("name", "age");
VertexExecutionOptions sourceExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("flinkSource")
.setTag(tag)
.setFields(cols)
.setLimit(100)
.builder();
// 構造 NebulaInputFormat
NebulaInputFormat inputFormat = new NebulaInputFormat(metaConnectionProvider)
.setExecutionOptions(sourceExecutionOptions);
// 方式 1 使用 createInput 方式注冊 Nebula Graph 資料源
DataSource<Row> dataSource1 = ExecutionEnvironment.getExecutionEnvironment()
.createInput(inputFormat);
// 方式 2 使用 addSource 方式注冊 Nebula Graph 資料源
NebulaSourceFunction sourceFunction = new NebulaSourceFunction(metaConnectionProvider)
.setExecutionOptions(sourceExecutionOptions);
DataStreamSource<Row> dataSource2 = StreamExecutionEnvironment.getExecutionEnvironment()
.addSource(sourceFunction);
Nebula Source Demo 編寫完成後可以打包送出到 Flink 叢集執行。
示例程式讀取 Nebula Graph 的點資料并列印,該作業以 Nebula Graph 作為 Source,以 print 作為 Sink,執行結果如下:
Source sent 資料為 59,671,064 條,Sink received 資料為 59,671,064 條。
二、Connector Sink
Nebula Flink Connector 中的 Sink 即 Nebula Graph 圖資料庫。Flink 提供了豐富的 Connector 元件允許使用者自定義資料池來接收 Flink 所處理的資料流。
2.1 Sink 簡介
Sink 是 Flink 處理完 Source 後資料的輸出,主要負責實時計算結果的輸出和持久化。比如:将資料流寫入标準輸出、寫入檔案、寫入 Sockets、寫入外部系統等。
Flink 的 Sink 能力主要是通過調用資料流的 write 相關 API 和 DataStream.addSink 兩種方式來實作資料流的外部存儲。
類似于 Flink Connector 的 Source,Sink 也允許使用者自定義來支援豐富的外部資料系統作為 Flink 的資料池。
Flink Sink 的使用方式如下:
本章主要介紹如何通過自定義 Sink 的方式實作 Nebula Graph Sink。
2.2 自定義 Sink
DataStream.addSink
DataStream.writeUsingOutputFormat
的方式将 Flink 資料流寫入外部自定義資料池。
Flink 已經提供了若幹實作好了的
Sink Functions
,也可以通過實作
SinkFunction
以及繼承
RichOutputFormat
來實作自定義的 Sink。
2.3 自定義 Nebula Graph Sink 實作原理
Nebula Flink Connector 中實作了自定義的 NebulaSinkFunction,開發者通過調用 DataSource.addSink 方法并将 NebulaSinkFunction 對象作為參數傳入即可實作将 Flink 資料流寫入 Nebula Graph。
Nebula Flink Connector 使用的是 Flink 的 1.11-SNAPSHOT 版本,該版本中已經廢棄了使用 writeUsingOutputFormat 方法來定義輸出端的接口。
源碼如下,是以請注意在使用自定義 Nebula Graph Sink 時請采用 DataStream.addSink 的方式。
/** @deprecated */
@Deprecated
@PublicEvolving
public DataStreamSink<T> writeUsingOutputFormat(OutputFormat<T> format) {
return this.addSink(new OutputFormatSinkFunction(format));
}
Nebula Graph Sink 實作類圖如下:
其中最重要的兩個類是 NebulaSinkFunction 和 NebulaBatchOutputFormat。
NebulaSinkFunction 繼承自 AbstractRichFunction 并實作了以下方法:
- 調用 NebulaBatchOutputFormat 的 open 方法,進行資源準備。
- 調用 NebulaBatchOutputFormat 的 close 方法,進行資源釋放。
-
invoke
是 Sink 中的核心方法, 調用 NebulaBatchOutputFormat 中的 write 方法進行資料寫入。
-
flush
調用 NebulaBatchOutputFormat 的 flush 方法進行資料的送出。
NebulaBatchOutputFormat 繼承自 AbstractNebulaOutPutFormat,AbstractNebulaOutPutFormat 繼承自 RichOutputFormat,主要實作的方法有:
- 準備圖資料庫 Nebula Graph 的 Graphd 服務的連接配接,并初始化資料寫入執行器 nebulaBatchExecutor
- 送出最後批次資料,等待最後送出的回調結果并關閉服務連接配接等資源。
-
writeRecord
核心方法,将資料寫入 nebulaBufferedRow 中,并在達到配置的批量寫入 Nebula Graph 上限時送出寫入。Nebula Graph Sink 的寫入操作是異步的,是以需要執行回調來擷取執行結果。
- 當 bufferRow 存在資料時,将資料送出到 Nebula Graph 中。
在 AbstractNebulaOutputFormat 中調用了 NebulaBatchExecutor 進行資料的批量管理和批量送出,并通過定義回調函數接收批量送出的結果,代碼如下:
/**
* write one record to buffer
*/
@Override
public final synchronized void writeRecord(T row) throws IOException {
nebulaBatchExecutor.addToBatch(row);
if (numPendingRow.incrementAndGet() >= executionOptions.getBatch()) {
commit();
}
}
/**
* put record into buffer
*
* @param record represent vertex or edge
*/
void addToBatch(T record) {
boolean isVertex = executionOptions.getDataType().isVertex();
NebulaOutputFormatConverter converter;
if (isVertex) {
converter = new NebulaRowVertexOutputFormatConverter((VertexExecutionOptions) executionOptions);
} else {
converter = new NebulaRowEdgeOutputFormatConverter((EdgeExecutionOptions) executionOptions);
}
String value = converter.createValue(record, executionOptions.getPolicy());
if (value == null) {
return;
}
nebulaBufferedRow.putRow(value);
}
/**
* commit batch insert statements
*/
private synchronized void commit() throws IOException {
graphClient.switchSpace(executionOptions.getGraphSpace());
future = nebulaBatchExecutor.executeBatch(graphClient);
// clear waiting rows
numPendingRow.compareAndSet(executionOptions.getBatch(),0);
}
/**
* execute the insert statement
*
* @param client Asynchronous graph client
*/
ListenableFuture executeBatch(AsyncGraphClientImpl client) {
String propNames = String.join(NebulaConstant.COMMA, executionOptions.getFields());
String values = String.join(NebulaConstant.COMMA, nebulaBufferedRow.getRows());
// construct insert statement
String exec = String.format(NebulaConstant.BATCH_INSERT_TEMPLATE, executionOptions.getDataType(), executionOptions.getLabel(), propNames, values);
// execute insert statement
ListenableFuture<Optional<Integer>> execResult = client.execute(exec);
// define callback function
Futures.addCallback(execResult, new FutureCallback<Optional<Integer>>() {
@Override
public void onSuccess(Optional<Integer> integerOptional) {
if (integerOptional.isPresent()) {
if (integerOptional.get() == ErrorCode.SUCCEEDED) {
LOG.info("batch insert Succeed");
} else {
LOG.error(String.format("batch insert Error: %d",
integerOptional.get()));
}
} else {
LOG.error("batch insert Error");
}
}
@Override
public void onFailure(Throwable throwable) {
LOG.error("batch insert Error");
}
});
nebulaBufferedRow.clean();
return execResult;
}
由于 Nebula Graph Sink 的寫入是批量、異步的,是以在最後業務結束 close 資源之前需要将緩存中的批量資料送出且等待寫入操作的完成,以防在寫入送出之前提前把 Nebula Graph Client 關閉,代碼如下:
/**
* commit the batch write operator before release connection
*/
@Override
public final synchronized void close() throws IOException {
if(numPendingRow.get() > 0){
commit();
}
while(!future.isDone()){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
LOG.error("sleep interrupted, ", e);
}
}
super.close();
}
2.4 自定義 Nebula Graph Sink 應用實踐
Flink 将處理完成的資料 Sink 到 Nebula Graph 時,需要将 Flink 資料流進行 map 轉換成 Nebula Graph Sink 可接收的資料格式。自定義 Nebula Graph Sink 的使用方式是通過 addSink 形式,将 NebulaSinkFunction 作為參數傳給 addSink 方法來實作 Flink 資料流的寫入。
-
- 配置 address,NebulaSource 需要配置 Nebula Graph Graphd 服務的位址。
-
- 配置要寫入的 tag
- 配置要寫入的字段集
- 配置寫入的點 ID 所在 Flink 資料流 Row 中的索引
- 配置批量寫入 Nebula Graph 的數量,預設 2000
-
- 配置要寫入的 edge
- 配置寫入的邊 src-id 所在 Flink 資料流 Row 中的索引
- 配置寫入的邊 dst-id 所在 Flink 資料流 Row 中的索引
- 配置寫入的邊 rank 所在 Flink 資料流 Row 中的索引,不配則無 rank
/// 構造 Nebula Graphd 用戶端連接配接需要的參數
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions
.NebulaClientOptionsBuilder()
.setAddress("127.0.0.1:3699")
.build();
NebulaConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions);
// 構造 Nebula Graph 寫入操作參數
List<String> cols = Arrays.asList("name", "age")
ExecutionOptions sinkExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
.setGraphSpace("flinkSink")
.setTag(tag)
.setFields(cols)
.setIdIndex(0)
.setBatch(20)
.builder();
// 寫入 Nebula Graph
dataSource.addSink(nebulaSinkFunction);
Nebula Graph Sink 的 Demo 程式以 Nebula Graph 的 space:flinkSource 作為 Source 讀取資料,進行 map 類型轉換後 Sink 入 Nebula Graph 的 space:flinkSink,對應的應用場景為将 Nebula Graph 中一個 space 的資料流入另一個 space 中。
三、 Catalog
Flink 1.11.0 之前,使用者如果依賴 Flink 的 Source/Sink 讀寫外部資料源時,必須要手動讀取對應資料系統的 Schema。比如,要讀寫 Nebula Graph,則必須先保證明确地知曉在 Nebula Graph 中的 Schema 資訊。但是這樣會有一個問題,當 Nebula Graph 中的 Schema 發生變化時,也需要手動更新對應的 Flink 任務以保持類型比對,任何不比對都會造成運作時報錯使作業失敗。這個操作備援且繁瑣,體驗極差。
1.11.0 版本後,使用者使用 Flink Connector 時可以自動擷取表的 Schema。可以在不了解外部系統資料 Schema 的情況下進行資料比對。
目前 Nebula Flink Connector 中已支援資料的讀寫,要實作 Schema 的比對則需要為 Flink Connector 實作 Catalog 的管理。但為了確定 Nebula Graph 中資料的安全性,Nebula Flink Connector 隻支援 Catalog 的讀操作,不允許進行 Catalog 的修改和寫入。
通路 Nebula Graph 指定類型的資料時,完整路徑應該是以下格式:
<graphSpace>.<VERTEX.tag>
或者
<graphSpace>.<EDGE.edge>
具體使用方式如下:
String catalogName = "testCatalog";
String defaultSpace = "flinkSink";
String username = "root";
String password = "nebula";
String address = "127.0.0.1:45500";
String table = "VERTEX.player"
// define Nebula catalog
Catalog catalog = NebulaCatalogUtils.createNebulaCatalog(catalogName,defaultSpace, address, username, password);
// define Flink table environment
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(bsEnv);
// register customed nebula catalog
tEnv.registerCatalog(catalogName, catalog);
// use customed nebula catalog
tEnv.useCatalog(catalogName);
// show graph spaces of Nebula Graph
String[] spaces = tEnv.listDatabases();
// show tags and edges of Nebula Graph
tEnv.useDatabase(defaultSpace);
String[] tables = tEnv.listTables();
// check tag player exist in defaultSpace
ObjectPath path = new ObjectPath(defaultSpace, table);
assert catalog.tableExists(path) == true
// get nebula tag schema
CatalogBaseTable table = catalog.getTable(new ObjectPath(defaultSpace, table));
table.getSchema();
Nebula Flink Connector 支援的其他 Catalog 接口請檢視 GitHub 代碼 NebulaCatalog.java。
四、 Exactly-once
Flink Connector 的 Exactly-once 是指 Flink 借助于 checkpoint 機制保證每個輸入事件隻對最終結果影響一次,在資料處理過程中即使出現故障,也不會存在資料重複和丢失的情況。
為了提供端到端的 Exactly-once 語義,Flink 的外部資料系統也必須提供送出或復原的方法,然後通過 Flink 的 checkpoint 機制協調。Flink 提供了實作端到端的 Exactly-once 的抽象,即實作二階段送出的抽象類 TwoPhaseCommitSinkFunction。
想為資料輸出端實作 Exactly-once,則需要實作四個函數:
-
beginTransaction
在事務開始前,在目标檔案系統的臨時目錄建立一個臨時檔案,随後可以在資料處理時将資料寫入此檔案。
-
preCommit
在預送出階段,關閉檔案不再寫入。為下一個 checkpoint 的任何後續檔案寫入啟動一個新事務。
-
commit
在送出階段,将預送出階段的檔案原子地移動到真正的目标目錄。二階段送出過程會增加輸出資料可見性的延遲。
-
abort
在終止階段,删除臨時檔案。
根據上述函數可看出,Flink 的二階段送出對外部資料源有要求,即 Source 資料源必須具備重發功能,Sink 資料池必須支援事務送出和幂等寫。
Nebula Graph v1.1.0 雖然不支援事務,但其寫入操作是幂等的,即同一條資料的多次寫入結果是一緻的。是以可以通過 checkpoint 機制實作 Nebula Flink Connector 的 At-least-Once 機制,根據多次寫入的幂等性可以間接實作 Sink 的 Exactly-once。
要使用 Nebula Graph Sink 的容錯性,請確定在 Flink 的執行環境中開啟了 checkpoint 配置:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000) // checkpoint every 10000 msecs
.getCheckpointConfig()
.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
Reference
- Nebula Source Demo [testNebulaSource]: https://github.com/vesoft-inc/nebula-java/blob/master/examples/src/main/java/org/apache/flink/FlinkDemo.java
- Nebula Sink Demo [testSourceSink]:
- Apache Flink 源碼: https://github.com/apache/flink
- ApacheFlink 零基礎入門: https://www.infoq.cn/theme/28
- Flink 文檔: https://flink.apache.org/flink-architecture.html
- Flink 實踐文檔: https://ci.apache.org/projects/flink/flink-docs-release-1.12/
- flink-connector-jdbc 源碼: https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-jdbc
- Flink JDBC Catalog 詳解: https://cloud.tencent.com/developer/article/1697913
喜歡這篇文章?來來來,給我們的
GitHub點個 star 表鼓勵啦~~ 🙇♂️🙇♀️ [手動跪謝]
交流圖資料庫技術?交個朋友,Nebula Graph 官方小助手微信:
NebulaGraphbot拉你進交流群~~