天天看點

從 Neo4j 導入 Nebula Graph 實踐見 SPark 資料導入原理

從 Neo4j 導入 Nebula Graph 實踐見 SPark 資料導入原理

本文主要講述如何使用資料導入工具 Nebula Graph Exchange 将資料從 Neo4j 導入到

Nebula Graph

Database。在講述如何實操資料導入之前,我們先來了解下 Nebula Graph 内部是如何實作這個導入功能的。

Nebula Graph Exchange 的資料處理原理

我們這個導入工具名字是 Nebula Graph

Exchange

,采用 Spark 作為導入平台,來支援海量資料的導入和保障性能。Spark 本身提供了不錯的抽象——DataFrame,使得可以輕松支援多種資料源。在 DataFrame 的支援下,添加新的資料源隻需提供配置檔案讀取的代碼和傳回 DataFrame 的 Reader 類,即可支援新的資料源。

從 Neo4j 導入 Nebula Graph 實踐見 SPark 資料導入原理

DataFrame 可以視為一種分布式存表格。DataFrame 可以存儲在多個節點的不同分區中,多個分區可以存儲在不同的機器上,進而支援并行操作。Spark 還提供了一套簡潔的 API 使使用者輕松操作 DataFrame 如同操作本地資料集一般。現在大多數資料庫提供直接将資料導出成 DataFrame 功能,即使某個資料庫并未提供此功能也可以通過資料庫 driver 手動建構 DataFrame。

Nebula Graph Exchange 将資料源的資料處理成 DataFrame 之後,會周遊它的每一行,根據配置檔案中 fields 的映射關系,按列名擷取對應的值。在周遊

batchSize

個行之後,Exchange 會将擷取的資料一次性寫入到 Nebula Graph 中。目前,Exchange 是通過生成 nGQL 語句再由 Nebula Client 異步寫入資料,下一步會支援直接導出 Nebula Graph 底層存儲的 sst 檔案,以擷取更好的性能。接下來介紹一下 Neo4j 資料源導入的具體實作。

Neo4j 資料導入具體實作

雖然 Neo4j 官方提供了可将資料直接導出為 DataFrame 的庫,但使用它讀取資料難以滿足斷點續傳的需求,我們未直接使用這個庫,而是使用 Neo4j 官方的 driver 實作資料讀取。Exchange 通過在不同分區調取 Neo4j driver 執行不同

skip

limit

的 Cypher 語句,将資料分布在不同的分區,來擷取更好的性能。這個分區數量由配置項

partition

指定。

Exchange 中的 Neo4jReader 類會先将使用者配置中的

exec

Cypher 語句,

return

後邊的語句替換成

count(*)

執行擷取資料總量,再根據分區數計算每個分區的起始偏移量和大小。這裡如果使用者配置了

check_point_path

目錄,會讀取目錄中的檔案,如果處于續傳的狀态,Exchange 會計算出每個分區應該的偏移量和大小。然後每個分區在 Cypher 語句後邊添加不同的

skip

limit

,調用 driver 執行。最後将傳回的資料處理成 DataFrame 就完成了 Neo4j 的資料導入。

過程如下圖所示:

從 Neo4j 導入 Nebula Graph 實踐見 SPark 資料導入原理

Neo4j 資料導入實踐

我們這裡導入示範的系統環境如下:

  • cpu name:Intel(R) Xeon(R) CPU E5-2697 v3 @ 2.60GHz
  • cpu cores:14
  • memory size:251G

軟體環境如下:

  • Neo4j:3.5.20 社群版
  • Nebula graph:docker-compose 部署,預設配置
  • Spark:單機版,版本為 2.4.6 pre-build for hadoop2.7

由于 Nebula Graph 是強 schema 資料庫,資料導入前需先進行建立 Space,建 Tag 和 Edge 的 schema,具體的文法可以參考

這裡

這裡建了名為 test 的 Space,副本數為 1。這裡建立了兩種 Tag 分别為 tagA 和 tagB,均含有 4 個屬性的點類型,此外,還建立一種名為 edgeAB 的邊類型,同樣含有 4 個屬性。具體的 nGQL 語句如下所示:

# 建立圖空間
CREATE SPACE test(replica_factor=1);
# 選擇圖空間 test
USE test;
# 建立标簽 tagA
CREATE TAG tagA(idInt int, idString string, tboolean bool, tdouble double);
# 建立标簽 tagB
CREATE TAG tagB(idInt int, idString string, tboolean bool, tdouble double);
# 建立邊類型 edgeAB
CREATE EDGE edgeAB(idInt int, idString string, tboolean bool, tdouble double);           

同時向 Neo4j 導入 Mock 資料——标簽為 tagA 和 tagB 的點,數量總共為 100 萬,并且導入了連接配接 tagA 和 tagB 類型點邊類型為 edgeAB 的邊,共 1000 萬個。另外需要注意的是,從 Neo4j 導出的資料在 Nebula Graph 中必須存在屬性,且資料對應的類型要同 Nebula Graph 一緻。

最後為了提升向 Neo4j 導入 Mock 資料的效率和 Mock 資料在 Neo4j 中的讀取效率,這裡為 tagA 和 tagB 的

idInt

屬性建了索引。關于索引需要注意 Exchange 并不會将 Neo4j 中的索引、限制等資訊導入到 Nebula Graph 中,是以需要使用者在執行資料寫入在 Nebula Graph 之後,自行

建立索引 REBUILD 索引

(為已有資料建立索引)。

接下來就可以将 Neo4j 資料導入到 Nebula Graph 中了,首先我們需要下載下傳和編譯打包項目,項目在

nebula-java

這個倉庫下 tools/exchange 檔案夾中。可執行如下指令:

git clone https://github.com/vesoft-inc/nebula-java.git
cd nebula-java/tools/exchange
mvn package -DskipTests           

然後就可以看到

target/exchange-1.0.1.jar

這個檔案。

接下來編寫配置檔案,配置檔案的格式為:HOCON(Human-Optimized Config Object Notation),可以基于

src/main/resources/server_application.conf

檔案的基礎上進行更改。首先對 nebula 配置項下的 address、user、pswd 和 space 進行配置,測試環境均為預設配置,是以這裡不需要額外的修改。然後進行 tags 配置,需要 tagA 和 tagB 的配置,這裡僅展示 tagA 配置,tagB 和 tagA 配置相同。

{
    # ======neo4j連接配接設定=======
    name: tagA
    # 必須和 Nebula Graph 的中 tag 名字一緻,需要在 Nebula Graph 中事先建好 tag
    server: "bolt://127.0.0.1:7687"
    # neo4j 的位址配置
    user: neo4j
    # neo4j 的使用者名
    password: neo4j
    # neo4j 的密碼

    encryption: false
    # (可選): 傳輸是否加密,預設值為 false
    database: graph.db
    # (可選): neo4j database 名稱,社群版不支援

    # ======導入設定============
    type: {
        source: neo4j
        # 還支援 PARQUET、ORC、JSON、CSV、HIVE、MYSQL、PULSAR、KAFKA...
        sink: client
        # 寫入 Nebula Graph 的方式,目前僅支援 client,未來會支援直接導出 Nebula Graph 底層資料庫檔案
    }

    nebula.fields: [idInt, idString, tdouble, tboolean]
    fields       : [idInt, idString, tdouble, tboolean]
    # 映射關系 fields,上方為 nebula 的屬性名,下方為 neo4j 的屬性名,一一對應
    # 映射關系的配置是 List 而不是 Map,是為了保持 fields 的順序,未來直接導出 nebula 底層存儲檔案時需要

    vertex: idInt
    # 作為 nebula vid 的 neo4j field,類型需要是整數(long or int)。

    partition: 10
    # 分區數
    batch: 2000
    # 一次寫入 nebula 多少資料

    check_point_path: "file:///tmp/test"
    # (可選): 儲存導入進度資訊的目錄,用于斷點續傳

    exec: "match (n:tagA) return n.idInt as idInt, n.idString as idString, n.tdouble as tdouble, n.tboolean as tboolean order by n.idInt"
}           

邊的設定大部分與點的設定無異,但由于邊在 Nebula Graph 中有起點的 vid 和終點的 vid 辨別,是以這裡需要指定作為邊起點 vid 的域和作為邊終點 vid 的域。

下面給出邊的特别配置。

source: {
  field: a.idInt
  # policy: "hash"
}
# 起點的 vid 設定
target: {
  field: b.idInt
  # policy: "uuid"
}
# 終點的 vid 設定

ranking: idInt
# (可選): 作為 rank 的 field

partition: 1
# 這裡分區數設定為 1,原因在後邊

exec: "match (a:tagA)-[r:edgeAB]->(b:tagB) return a.idInt, b.idInt, r.idInt as idInt, r.idString as idString, r.tdouble as tdouble, r.tboolean as tboolean order by id(r)"           

點的 vertex 和邊的 source、target 配置項下都可以設定 policy hash/uuid,它可以将類型為字元串的域作為點的 vid,通過 hash/uuid 函數将字元串映射成整數。

上面的例子由于作為點的 vid 為整數,是以并不需要 policy 的設定。hash/uuid的 差別請看

Cypher 标準

中如果沒有

order by

限制的話就不能保證每次查詢結果的排序一緻,雖然看起來即便不加

order by

Neo4j 傳回的結果順序也是不變的,但為了防止可能造成的導入時資料丢失,還是強烈建議在 Cypher 語句中加入

order by

,雖然這會增加導入的時間。為了提升導入效率,

order by

 語句最好選取有索引的屬性作為排序的屬性。如果沒有索引,也可觀察預設的排序,選擇合适的排序屬性以提高效率。如果預設的排序找不到規律,可以使用點/關系的 ID 作為排序屬性,并且将

partition

的值盡量設小,減少 Neo4j 的排序壓力,本文中邊

edgeAB

partition

 就設定為 1。

另外 Nebula Graph 在建立點和邊時會将 ID 作為唯一主鍵,如果主鍵已存在則會覆寫該主鍵中的資料。是以假如将某個 Neo4j 屬性值作為 Nebula Graph 的 ID,而這個屬性值在 Neo4j 中是有重複的,就會導緻“重複 ID”對應的資料有且隻有一條會存入 Nebula Graph 中,其它的則會被覆寫掉。由于資料導入過程是并發地往 Nebula Graph 中寫資料,最終儲存的資料并不能保證是 Neo4j 中最新的資料。

這裡還要留意下斷點續傳功能,在斷點和續傳之間,資料庫不應該改變狀态,如添加資料或删除資料,且

partition

 數量也不能更改,否則可能會有資料丢失。

最後由于 Exchange 需要在不同分區執行不同

skip

limit

的 Cypher 語句,是以使用者提供的 Cypher 語句不能含有

skip

limit

語句。

接下來就可以運作 Exchange 程式導資料了,執行如下指令:

$SPARK_HOME/bin/spark-submit  --class com.vesoft.nebula.tools.importer.Exchange --master "local[10]" target/exchange-1.0.1.jar -c /path/to/conf/neo4j_application.conf           

在上述這些配置下,導入 100 萬個點用時 13s,導入 1000 萬條邊用時 213s,總用時是 226s。

附:Neo4j 3.5 Community 和 Nebula Graph 1.0.1的一些比較

Neo4j 和 Nebula Graph 在系統架構、資料模型和通路方式上都有一些差異,下表列舉了常見的異同

從 Neo4j 導入 Nebula Graph 實踐見 SPark 資料導入原理
作者有話說:Hi,我是李夢捷,圖資料庫 Nebula Graph 的研發工程師,如果你對此文有疑問,歡迎來我們的 Nebula Graph 論壇交流下心得~~

喜歡這篇文章?來來來,給我們的

GitHub

點個 star 表鼓勵啦~~ 🙇‍♂️🙇‍♀️ [手動跪謝]

交流圖資料庫技術?交個朋友,Nebula Graph 官方小助手微信:

NebulaGraphbot

拉你進交流群~~

推薦閱讀