
鏡像下載下傳、域名解析、時間同步請點選
阿裡巴巴開源鏡像站作者:張友東
MongoDB 從3.6版本開始支援了 Change Stream 能力(4.0、4.2 版本在能力上做了很多增強),用于訂閱 MongoDB 内部的修改操作,change stream 可用于 MongoDB 之間的增量資料遷移、同步,也可以将 MongoDB 的增量訂閱應用到其他的關聯系統;比如電商場景裡,MongoDB 裡存儲新的訂單資訊,業務需要根據新增的訂單資訊去通知庫存管理系統發貨。
一、Change Stream 與 Tailing Oplog 對比
在 change stream 功能之前,如果要擷取 MongoDB 增量的修改,可以通過不斷
tailing oplog
的方式來
拉取增量的 oplog,然後針對拉取到的 oplog 集合,來過濾滿足條件的 oplog。這種方式也能滿足絕大部分場景的需求,但存在如下的不足。
- 使用門檻較高,使用者需要針對 oplog 集合,打開特殊選項的的 tailable cursor ("tailable": true, "awaitData" : true)。
- 使用者需要自己管理增量續傳,當拉取應用 crash 時,使用者需要記錄上一條拉取oplog的 ts、h 等字段,在下一次先定位到指定 oplog 再繼續拉取。
- 結果過濾必須在拉取側完成,但隻需要訂閱部分 oplog 時,比如針對某個 DB、某個 Collection、或某種類型的操作,必須要把左右的 oplog 拉取到再進行過濾。
- 對于 update 操作,oplog 隻包含操作的部分内容,比如
,而應用經常需要擷取到完整的文檔内容。{$set: {x: 1}}
- 不支援 Sharded Cluster 的訂閱,使用者必須針對每個 shard 進行 tailing oplog,并且這個過程中不能有 moveChunk 操作,否則結果可能亂序。
MongoDB Change Stream 解決了 Tailing oplog 存在的不足,具有以下特點。
- 簡單易用,提供統一的 Change Stream API,一次 API 調用,即可從 MongoDB Server 側擷取增量修改。
- 統一的進度管理,通過 resume token 來辨別拉取位置,隻需在 API 調用時,帶上上次結果的 resume token,即可從上次的位置接着訂閱。
- 支援對結果在 Server 端進行 pipeline 過濾,減少網絡傳輸,支援針對 DB、Collection、OperationType 等次元進行結果過濾。
- 支援 fullDocument: "updateLookup" 選項,對于 update,傳回當時對應文檔的完整内容。
- 支援 Sharded Cluster 的修改訂閱,相同的 API 請求發到 mongos ,即可擷取叢集次元全局有序的修改。
二、Change Stream 實戰
以 Mongo shell 為例,使用 Change Stream 非常簡單,mongo shell 封裝了針對整個執行個體、DB、Collection 級别的訂閱操作。
db.getMongo().watch() 訂閱整個執行個體的修改
db.watch() 訂閱指定DB的修改
db.collection.watch() 訂閱指定Collection的修改
1.建立連接配接1發起訂閱操作
mytest:PRIMARY>db.coll.watch([], {maxAwaitTimeMS: 60000}) 最多阻塞等待 1分鐘
2.建立連接配接2寫入新資料
mytest:PRIMARY> db.coll.insert({x: 100})
WriteResult({ "nInserted" : 1 })
mytest:PRIMARY> db.coll.insert({x: 101})
WriteResult({ "nInserted" : 1 })
mytest:PRIMARY> db.coll.insert({x: 102})
WriteResult({ "nInserted" : 1 })
3.連接配接1上收到 Change Stream 更新
mytest:PRIMARY> db.watch([], {maxAwaitTimeMS: 60000})
{ "_id" : { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934389, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9"), "x" : 100 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9") } }
{ "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } }
{ "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }
4.上述 ChangeStream 結果裡,_id 字段的内容即為 resume token,辨別着 oplog 的某個位置,如果想從某個位置繼續訂閱,在 watch 時,通過 resumeAfter 指定即可。比如每個應用訂閱了上述3條修改,但隻有第一條已經成功消費了,下次訂閱時指定第一條的 resume token 即可再次訂閱到接下來的2條。
mytest:PRIMARY> db.coll.watch([], {maxAwaitTimeMS: 60000, resumeAfter: { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }})
{ "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } }
{ "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }
三、Change Stream 内部實作
1. watch() wrapper
db.watch() 實際上是一個 API wrapper,實際上 Change Stream 在 MongoDB 内部實際上是一個 aggregation 指令,隻是加了一個特殊的
$changestream
階段,在發起 change stream 訂閱操作後,可通過 db.currentOp() 看到對應的 aggregation/getMore 操作的詳細參數。
{
"op" : "getmore",
"ns" : "test.coll",
"command" : {
"getMore" : NumberLong("233479991942333714"),
"collection" : "coll",
"maxTimeMS" : 50000,
"lsid" : {
"id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")
},
},
"planSummary" : "COLLSCAN",
"cursor" : {
"cursorId" : NumberLong("233479991942333714"),
"createdDate" : ISODate("2019-12-31T06:35:52.479Z"),
"lastAccessDate" : ISODate("2019-12-31T06:36:09.988Z"),
"nDocsReturned" : NumberLong(1),
"nBatchesReturned" : NumberLong(1),
"noCursorTimeout" : false,
"tailable" : true,
"awaitData" : true,
"originatingCommand" : {
"aggregate" : "coll",
"pipeline" : [
{
"$changeStream" : {
"fullDocument" : "default"
}
}
],
"cursor" : {
},
"lsid" : {
"id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")
},
"$clusterTime" : {
"clusterTime" : Timestamp(1577774144, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
},
"$db" : "test"
},
"operationUsingCursorId" : NumberLong(7019500)
},
"numYields" : 2,
"locks" : {
}
}
2. resume token
resume token 用來描述一個訂閱點,本質上是 oplog 資訊的一個封裝,包含 clusterTime、uuid、documentKey等資訊,當訂閱 API 帶上 resume token 時,MongoDB Server 會将 token 轉換為對應的資訊,并定位到 oplog 起點繼續訂閱操作。
struct ResumeTokenData {
Timestamp clusterTime;
int version = 0;
size_t applyOpsIndex = 0;
Value documentKey;
boost::optional<UUID> uuid;
};
ResumeTokenData 結構裡包含 version 資訊,在 4.0.7 以前的版本,version 均為0; 4.0.7 引入了一種新的 resume token 格式,version 為 1; 另外在 3.6 版本裡,Resume Token 的編碼與 4.0 也有所不同;是以在版本更新後,有可能出現不同版本 token 無法識别的問題,是以盡量要讓 MongoDB Server 所有元件(Replica Set 各個成員,ConfigServer、Mongos)都保持相同的核心版本。
更詳細的資訊,參考
https://docs.mongodb.com/manual/reference/method/Mongo.watch/#resumability3. updateLookup
Change Stream 支援針對 update 操作,擷取目前的文檔完整内容,而不是僅更新操作本身,比如
mytest:PRIMARY> db.coll.find({_id: 101})
{ "_id" : 101, "name" : "jack", "age" : 18 }
mytest:PRIMARY> db.coll.update({_id: 101}, {$set: {age: 20}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
上面的 update 操作,預設情況下,change stream 會收到
{_id: 101}, {$set: {age: 20}
的内容,而并不會包含這個文檔其他未更新字段的資訊;而加上 fullDocument: "updateLookup" 選項後,Change Stream 會根據文檔 _id 去查找文檔目前的内容并傳回。
需要注意的是,updateLookup 選項隻能保證最終一緻性,比如針對上述文檔,如果連續更新100次,update 的 change stream 并不會按順序收到中間每一次的更新,因為每次都是去查找文檔目前的内容,而目前的内容可能已經被後續的修改覆寫。
4. Sharded cluster
Change Stream 支援針對 sharded cluster 進行訂閱,會保證全局有序的傳回結果;為了達到全局有序這個目标,mongos 需要從每個 shard 都傳回訂閱結果按時間戳進行排序合并傳回。
在極端情況下,如果某些 shard 寫入量很少或者沒有寫入,change stream 的傳回延時會受到影響,因為需要等到所有 shard 都傳回訂閱結果;預設情況下,mongod server 每10s會産生一條 Noop 的特殊oplog,這個機制會間接驅動 sharded cluster 在寫入量不高的情況下也能持續運轉下去。
由于需要全局排序,在 sharded cluster 寫入量很高時,Change Stream 的性能很可能跟不上;如果對性能要求非常高,可以考慮關閉 Balancer,在每個 shard 上各自建立 Change Stream。
四、參考資料
“ 提供全面,高效和穩定的系統鏡像、應用軟體下載下傳、域名解析和時間同步服務。”