本文正在參加星光計劃3.0–夏日挑戰賽
作者:蔣衛峰
1、分布式資料對象概述
分布式資料對象管理架構是一款面向對象的記憶體資料管理架構,向應用開發者提供記憶體對象的建立、查詢、删除、修改、訂閱等基本資料對象的管理能力,同時具備分布式能力,滿足超級終端場景下,相同應用多裝置間的資料對象協同需求。
分布式資料對象提供JS接口,讓開發者能以使用本地對象的方式使用分布式對象。分布式資料對象支援的資料類型包括數字型、字元型、布爾型等基本類型,同時也支援數組、基本類型嵌套等複雜類型。
限制與限制:不同裝置間隻有相同bundleName的應用才能直接同步;不建議建立過多分布式對象,每個分布式對象将占用100-150KB記憶體;每個對象大小不超過500KB;如對複雜類型的資料進行修改,僅支援修改根屬性,暫不支援下級屬性修改。
分布式資料對象依賴于分布式資料服務。
項目路徑:https://gitee.com/openharmony/distributeddatamgr_objectstore
1.1、分布式資料對象JS對外接口清單
import distributedObject from '@ohos.data.distributedDataObject'
接口
接口名稱 | 描述 |
---|---|
function createDistributedObject(source: object): DistributedObject; | 建分布式對象source中指定分布式對象中的屬性傳回值是建立出的分布式對象,接口見DistrubutedObject。 |
接口名稱 | 描述 |
---|---|
function createDistributedObject(source: object): DistributedObject; | 建立分布式對象source中指定分布式對象中的屬性傳回值是建立出的分布式對象,接口見DistrubutedObject。 |
function genSessionId(): string; | 随機建立sessionId,傳回值是随機建立的sessionId。 |
DistributedObject
接口名稱 | 描述 |
---|---|
setSessionId(sessionId: string): boolean; | 設定同步的sessionId,可信組網中有多個裝置時,多個裝置間的對象如果設定為同一個sessionId,就能自動同步sessionId是指定的sessionId,如果要退出分布式組網,設定為“”或不設定均可,傳回值是操作結果,true辨別設定session成功。 |
on(type: 'change', callback: Callback<{ sessionId: string, fields: Array }>): void; | 監聽對象的變更type,固定為'change',callback是變更時觸發的回調,回調參數sessionId辨別變更對象的sessionId,fields辨別對象變更的屬性名。 |
off(type: 'change', callback: Callback<{ sessionId: string, fields: Array } | 删除對象的變更監聽,type固定為'change',callback為可選參數,不設定表示删除該對象所有變更監聽。 |
on(type: 'status', callback: Callback<{ sessionId: string, networkId: string, status: 'online' | 'offline' }>): void | 監聽對象的變更,type固定為'status',callback是變更時觸發的回調,回調參數sessionId辨別變更對象的sessionId,networkId辨別對象裝置的networkId,status辨別對象為'online'(上線)或'offline'(下線)的狀态。 |
off(type: 'status', callback: Callback<{ sessionId: string, deviceId: string, status: 'online' | 'offline' }>): void | 删除對象的變更監聽,type固定為'change',callback為可選參數,不設定表示删除該對象所有上下線監聽。 |
1.2、簡單類圖

2、接口詳述
架構
JS接口C++部分frameworks\jskitsimpl\src\adaptor\js_module_init.cpp
```c++
static napi_value DistributedDataObjectExport(napi_env env, napi_value exports)
{
napi_status status;
static napi_property_descriptor desc[] = {
DECLARE_NAPI_FUNCTION("createObjectSync", JSDistributedObjectStore::JSCreateObjectSync),
DECLARE_NAPI_FUNCTION("destroyObjectSync", JSDistributedObjectStore::JSDestroyObjectSync),
DECLARE_NAPI_FUNCTION("on", JSDistributedObjectStore::JSOn),
DECLARE_NAPI_FUNCTION("off", JSDistributedObjectStore::JSOff),
DECLARE_NAPI_FUNCTION("recordCallback", JSDistributedObjectStore::JSRecordCallback),
DECLARE_NAPI_FUNCTION("deleteCallback", JSDistributedObjectStore::JSDeleteCallback),
};
status = napi_define_properties(env, exports, sizeof(desc) / sizeof(desc[0]), desc);
CHECK_EQUAL_WITH_RETURN_NULL(status, napi_ok);
return exports;
}
// storage module define
static napi_module storageModule = {
.nm_version = 1,
.nm_flags = 0,
.nm_filename = nullptr,
.nm_register_func = DistributedDataObjectExport,
.nm_modname = "data.distributedDataObject",
.nm_priv = ((void *)0),
.reserved = { 0 },
};
// distributeddataobject module register
static __attribute__((constructor)) void RegisterModule()
{
napi_module_register(&storageModule);
}
```
JS接口JS部分:interfaces\jskits\distributed_data_object.js
```javascript
class Distributed {
constructor(obj) {
this.__proxy = obj;
Object.keys(obj).forEach(key => {
Object.defineProperty(this, key, {
enumerable: true,
configurable: true,
get: function () { return this.__proxy[key]; },
set: function (newValue) {
this.__proxy[key] = newValue; } }); });
Object.defineProperty(this, SESSION_ID, {
enumerable: true,
configurable: true,
get: function () { return this.__proxy[SESSION_ID]; },
set: function (newValue) {
this.__proxy[SESSION_ID] = newValue; } });
this.__objectId = randomNum();
console.info("constructor success ");
};
setSessionId(sessionId) {
if (sessionId == null || sessionId == "") {
leaveSession(this.__proxy); return false; }
if (this.__proxy[SESSION_ID] == sessionId) {
console.info("same session has joined " + sessionId);
return true;
}
leaveSession(this.__proxy);
let object = joinSession(this.__proxy, this.__objectId, sessionId);
if (object != null) { this.__proxy = object; return true; }
return false;
};
on(type, callback) { onWatch(type, this.__proxy, callback);
distributedObject.recordCallback(type, this.__objectId, callback);
};
off(type, callback) { offWatch(type, this.__proxy, callback);
distributedObject.deleteCallback(type, this.__objectId, callback);
};
__proxy;
__objectId;
}
export default {
createDistributedObject: newDistributed,
genSessionId: randomNum
}
```
2.1、createDistributedObject
```javascript
// 建立對象,對象包含3個基本屬性:name,age和isVis;2個複雜屬性:parent,list
var g_object = distributedObject.createDistributedObject({name:undefined, age:undefined, isVis:true,
parent:undefined, list:undefined});
```
建立一個 Distributed 對象,此對象為node.js類 Distributed的對象。
此類有兩個成員:__proxy 存儲傳入的參數對象,目前對象添加傳入的參數各個key為屬性名的屬性,每個屬性定義get和set方法;并加上 __sessionId 為屬性名的屬性,定義get和set屬性。 __objectId 存儲對象ID,此值為一個随機值。傳回建立的對象。
```javascript
constructor(obj) {
this.__proxy = obj;
Object.keys(obj).forEach(key => {
Object.defineProperty(this, key, {
enumerable: true,
configurable: true,
get: function () {
return this.__proxy[key];
},
set: function (newValue) {
this.__proxy[key] = newValue;
}
});
});
Object.defineProperty(this, SESSION_ID, {
enumerable: true,
configurable: true,
get: function () {
return this.__proxy[SESSION_ID];
},
set: function (newValue) {
this.__proxy[SESSION_ID] = newValue;
}
});
this.__objectId = randomNum();
console.info("constructor success ");
};
```
2.2、DistributedObject::genSessionId
擷取長度為8的由随機數字構成的字元串。
2.3、DistributedObject::setSessionId
發起方
javascript g_object.setSessionId(distributedObject.genSessionId());
被拉起方
javascript //sessionId與發起方的__sessionId一緻 g_object.setSessionId(sessionId);
設定同步的sessionId,多個可信組網中有多個裝置時,多個裝置間的對象如果設定為同一個sessionId,就能自動同步。如果要退出分布式組網,設定為“”或不設定均可。
如果sessionId和緩存存儲的sessionId是一樣的,直接傳回true,如果不相同,先調用 distributedObject.destroyObjectSync(obj) 删除舊的sessionId,再調用 distributedObject.createObjectSync(sessionId, objectId) 利用sessionId建立新的資料對象。然後緩存存儲新的sessionId。
2.3.1、建立資料對象
distributedObject.createObjectSync(js_module_init.cpp)流程:
通過上面函數調用等操作, Distributed 對象的成員 this.__proxy 的各個屬性的get和put方法和接口導出的get和put關聯起來了 JSObjectWrapper 内部存的 object_ 對象實際為 DistributedObjectImpl 對象。
2.3.2、銷毀資料對象
JSDistributedObjectStore::JSDestroyObjectSync 流程
2.4、DistributedObject::on()
2.4.1、資料更新監聽建立
```javascript
changeCallback : function (sessionId, changeData) {
console.info("change" + sessionId + " " + this.response);
if (changeData != null && changeData != undefined) {
changeData.forEach(element => {
console.info("changed !" + element + " " + g_object[element]);
});
}
}
g_object.on("change", this.changeCallback);
```
開啟對對象資料變更的監聽,type固定為'change',callback是變更時觸發的回調,回調參數sessionId辨別變更對象的sessionId,fields辨別對象變更的屬性名。
JS接口JS部分流程
- onWatch(type, this.__proxy, callback)
- distributedObject.on(type, obj, callback) 調用JS接口C++ 部分的函數,參數type為'change',obj為資料對象,callback為回調函數。
- distributedObject.recordCallback(type, this.__objectId, callback) 調用JS接口C++ 部分的函數,參數type為'change', __objectId 為資料對象的ID号,callback為回調函數。添加回調函數到全局回調map中。
JS接口C++部分流程
JSDistributedObjectStore::JSOn
JSDistributedObjectStore::JSRecordCallback 流程
使用napi解析傳過來的參數type,objectId,callbackType調用 AddCallback(env, g_changeCallBacks, objectId, argv[2]) 添加回調到對應的全局map回調中。
JSDistributedObjectStore::AddCallback 函數:
c++ void JSDistributedObjectStore::AddCallback(napi_env env, std::map<std::string, std::list<napi_ref>> &callbacks, const std::string &objectId, napi_value callback) { LOG_INFO("add callback %{public}s", objectId.c_str()); napi_ref ref = nullptr; napi_status status = napi_create_reference(env, callback, 1, &ref); CHECK_EQUAL_WITH_RETURN_VOID(status, napi_ok); if (callbacks.count(objectId) != 0) { auto lists = callbacks.at(objectId); lists.push_back(ref); callbacks.insert_or_assign(objectId, lists); } else { std::list<napi_ref> lists = { ref }; callbacks.insert_or_assign(objectId, lists); } }
napi_status status = napi_create_reference(env, callback, 1, &ref) 建立回調的引用,添加到key為mapobjectId對應的list中。即一個objectId可以對應多個回調引用,存儲再一個list中。
2.4.2、上下線更新監聽建立
```javascript
statusCallback : function (sessionId, networkid, status) {
this.response += "status changed " + sessionId + " " + status + " " + networkId;
}
g_object.on("status", this.changeCallback);
```
開啟對對象上下線變更的監聽,type固定為'status',callback是變更時觸發的回調,回調參數sessionId辨別變更對象的sessionId,fields辨別對象變更的屬性名。
JS接口JS部分流程
-
onWatch(type, this.__proxy, callback) distributedObject.on(type, obj, callback) 調用JS接口C++
部分的函數,參數type為'status',obj為資料對象,callback為回調函數。
distributedObject.recordCallback(type, this.__objectId, callback) 調用JS接口C++ 部分的函數,參數type為'status', __objectId 為資料對象的ID号,callback為回調函數。添加回調函數到全局回調map中。
JS接口C++部分流程
JSDistributedObjectStore::JSOn
JSDistributedObjectStore::JSRecordCallback 流程
使用napi解析傳過來的參數type,objectId,callbackType調用 AddCallback(env, g_statusCallBacks, objectId, argv[2]) 添加回調到對應的全局map回調中。
2.5、DistributedObject::off()
2.5.1、資料更新監聽删除
```javascript
//删除變更回調changeCallback
g_object.off("change", changeCallback);
//删除所有的變更回調
g_object.off("change");
```
删除對資料對象的資料變更監聽,type固定為'change',callback為可選參數,不設定表示删除該對象所有變更監聽。
JS接口JS部分流程
offWatch(type, this.__proxy, callback)
distributedObject.off(type, obj, callback) 調用JS接口C++ 部分函數,參數type為'change',obj為資料對象,callback為回調函數。
distributedObject.deleteCallback(type, this.__objectId, callback) 調用JS接口C++ 部分函數,參數type為'change',__objectId為資料對象的ID号,callback為回調函數。從全局回調map中删除回調函數,要是callback為空,則删除此ID号的全部回調函數。
JS接口C++ 部分流程
JSDistributedObjectStore::JSOff
使用napi解析擷取傳過來的參數type,使用napi_unwrap根據傳入的參數擷取之前綁定的wrapper對象。如果第三個參數為空,DeleteWatch參數handler為nullptr。
調用 JSWatcher::Off 進行删除。
參數handler為nullptr先調用 ChangeEventListener::Clear(napi_env env) 删除所有回調。
```c++
void ChangeEventListener::Clear(napi_env env)
{
EventListener::Clear(env);
if (isWatched_ && object_ != nullptr) {
uint32_t ret = objectStore_->UnWatch(object_);
if (ret != SUCCESS) {
LOG_ERROR("UnWatch %{public}s error", object_->GetSessionId().c_str());
} else {
LOG_INFO("UnWatch %{public}s success", object_->GetSessionId().c_str());
isWatched_ = false;
}
}
}
```
參數handler不為nullptr先調用 ChangeEventListener::Del(napi_env env, napi_value handler) 删除對應的handler。
```c++
bool ChangeEventListener::Del(napi_env env, napi_value handler)
{
bool isEmpty = EventListener::Del(env, handler);
if (isEmpty && isWatched_ && object_ != nullptr) {
uint32_t ret = objectStore_->UnWatch(object_);
if (ret != SUCCESS) {
LOG_ERROR("UnWatch %{public}s error", object_->GetSessionId().c_str());
} else {
LOG_INFO("UnWatch %{public}s success", object_->GetSessionId().c_str());
isWatched_ = false;
}
}
return isEmpty;
}
```
再調用 *DistributedObjectStoreImpl::UnWatch(DistributedObject object) 删除分布式資料服務内的監聽。
```c++
uint32_t DistributedObjectStoreImpl::UnWatch(DistributedObject *object)
{
if (object == nullptr) {
LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
return ERR_NULL_OBJECT;
}
if (flatObjectStore_ == nullptr) {
LOG_ERROR("DistributedObjectStoreImpl::Sync object err ");
return ERR_NULL_OBJECTSTORE;
}
uint32_t status = flatObjectStore_->UnWatch(object->GetSessionId());
if (status != SUCCESS) {
LOG_ERROR("DistributedObjectStoreImpl::Watch failed %{public}d", status);
return status;
}
watchers_.erase(object);
LOG_INFO("DistributedObjectStoreImpl:UnWatch object success.");
return SUCCESS;
}
```
```c++
uint32_t FlatObjectStore::UnWatch(const std::string &sessionId)
{
if (!storageEngine_->isOpened_) {
LOG_ERROR("FlatObjectStore::DB has not inited");
return ERR_DB_NOT_INIT;
}
uint32_t status = storageEngine_->UnRegisterObserver(sessionId);
if (status != SUCCESS) {
LOG_ERROR("FlatObjectStore::Watch failed %{public}d", status);
}
return status;
}
```
```c++
uint32_t FlatObjectStorageEngine::UnRegisterObserver(const std::string &key)
{
if (!isOpened_) {
LOG_ERROR("FlatObjectStorageEngine::RegisterObserver kvStore has not init");
return ERR_DB_NOT_INIT;
}
std::unique_lock<std::shared_mutex> lock(operationMutex_);
if (delegates_.count(key) == 0) {
LOG_INFO("FlatObjectStorageEngine::RegisterObserver %{public}s not exist", key.c_str());
return ERR_DB_NOT_EXIST;
}
auto iter = observerMap_.find(key);
if (iter == observerMap_.end()) {
LOG_ERROR("FlatObjectStorageEngine::UnRegisterObserver observer not exist.");
return ERR_NO_OBSERVER;
}
auto delegate = delegates_.at(key);
std::shared_ptr<TableWatcher> watcher = iter->second;
LOG_INFO("start UnRegisterObserver %{public}s", key.c_str());
DistributedDB::DBStatus status = delegate->UnRegisterObserver(watcher.get());
if (status != DistributedDB::DBStatus::OK) {
LOG_ERROR("FlatObjectStorageEngine::UnRegisterObserver unRegister err %{public}d", status);
return ERR_UNRIGSTER;
}
LOG_INFO("end UnRegisterObserver %{public}s", key.c_str());
observerMap_.erase(key);
return SUCCESS;
}
```
JSDistributedObjectStore::JSDeleteCallback
使用napi解析擷取傳過來的參數type,objectId如果參數為兩個調用 DelCallback(env, g_changeCallBacks, objectId) 進行注冊的回調函數的删除,如果有第三個參數調用 DelCallback(env, g_changeCallBacks, objectId, argv[2]) 進行注冊的回調函數的删除。
對于函數DelCallback如果第三個參數為nullptr則清空此ID的所有注冊的回調函數。
否則删除對應注冊的回調函數。如果是最後一個則把這個ID對象也删除了。
```c++
void JSDistributedObjectStore::DelCallback(napi_env env, std::map<std::string, std::list<napi_ref>> &callbacks,
const std::string &sessionId, napi_value callback)
{
LOG_INFO("del callback %{public}s", sessionId.c_str());
napi_status status;
if (callback == nullptr) {
if (callbacks.count(sessionId) != 0) {
for (auto ref : callbacks.at(sessionId)) {
status = napi_delete_reference(env, ref);
CHECK_EQUAL_WITH_RETURN_VOID(status, napi_ok);
}
callbacks.erase(sessionId);
}
return;
}
napi_value callbackTmp;
if (callbacks.count(sessionId) != 0) {
auto lists = callbacks.at(sessionId);
for (auto iter = lists.begin(); iter != lists.end();) {
status = napi_get_reference_value(env, *iter, &callbackTmp);
CHECK_EQUAL_WITH_RETURN_VOID(status, napi_ok);
bool isEquals = false;
napi_strict_equals(env, callbackTmp, callback, &isEquals);
if (isEquals) {
napi_delete_reference(env, *iter);
iter = lists.erase(iter);
} else {
iter++;
}
}
if (lists.empty()) {
callbacks.erase(sessionId);
} else {
callbacks.insert_or_assign(sessionId, lists);
}
}
}
```
2.5.2、上下線更新監聽删除
```javascript
statusCallback : function (sessionId, networkid, status) {
this.response += "status changed " + sessionId + " " + status + " " + networkId;
}
g_object.on("status", this.changeCallback);
```
删除對象的上下線變更監聽,type固定為'status',callback為可選參數,不設定表示删除該對象所有上下線監聽。
JS接口JS部分
- offWatch(type, this.__proxy, callback)
- distributedObject.off(type, obj, callback) 調用JS接口C++ 部分函數,參數type為'status',obj為資料對象,callback為回調函數。
- distributedObject.deleteCallback(type, this.__objectId, callback) 調用JS接口C++ 部分函數,參數type為'status', __objectId 為資料對象的ID号,callback為回調函數。從全局回調map中删除回調函數,要是callback為空,則删除此ID号的全部回調函數。
JS接口C++ 部分
JSDistributedObjectStore::JSOff
使用napi解析擷取傳過來的參數type,使用napi_unwrap根據傳入的參數擷取之前綁定的wrapper對象。如果第三個參數為空,DeleteWatch參數handler為nullptr。
調用 JSWatcher::Off 進行删除。
參數handler為nullptr先調用 StatusEventListener::Clear(napi_env env) 删除所有回調
參數handler不為nullptr調用 StatusEventListener::Del(napi_env env, napi_value handler) 删除對應的handler。
然後調用 NotifierImpl::GetInstance()->DelWatcher(sessionId_) 删除監控
JSDistributedObjectStore::JSDeleteCallback
使用napi解析擷取傳過來的參數type,objectId如果參數為兩個調用 DelCallback(env, g_statusCallBacks, objectId) 進行注冊的回調函數的删除,如果有第三個參數調用 DelCallback(env, g_statusCallBacks, objectId, argv[2]) 進行注冊的回調函數的删除。
對于函數DelCallback如果第三個參數為nullptr則清空此ID的所有注冊的回調函數。
否則删除對應注冊的回調函數。如果是最後一個則把這個ID對象也删除了。
2.6、對象屬性的get和put
在 DistributedObject::setSessionId 接口中給對象屬性都綁定了C++ 接口部分對應的接口函數。
在給屬性指派或修改值是會調用 JSDistributedObject::JSPut ,在擷取屬性的值時會調用 JSDistributedObject::JSGet 。
```c++
const char *distributedObjectName = "DistributedObject";
napi_property_descriptor distributedObjectDesc[] = {
DECLARE_NAPI_FUNCTION("put", JSDistributedObject::JSPut),
DECLARE_NAPI_FUNCTION("get", JSDistributedObject::JSGet),
};
napi_status status = napi_define_class(env, distributedObjectName, strlen(distributedObjectName),
JSDistributedObject::JSConstructor, nullptr, sizeof(distributedObjectDesc) / sizeof(distributedObjectDesc[0]),
distributedObjectDesc, &distributedObjectClass);
CHECK_EQUAL_WITH_RETURN_NULL(status, napi_ok);
if (g_instance == nullptr) {
g_instance = new napi_ref;
status = napi_create_reference(env, distributedObjectClass, 1, g_instance);
CHECK_EQUAL_WITH_RETURN_NULL(status, napi_ok);
}
```
2.6.1、JSDistributedObject::JSPut
```javascript
g_object.name = "jack";
g_object.age = 19;
g_object.isVis = false;
g_object.parent = {mother:"jack mom",father:"jack Dad"};
g_object.list = [{mother:"jack mom"}, {father:"jack Dad"}];
// 對端裝置收到change回調,fields為name,age,isVis,parent和list
```
2.6.2、JSDistributedObject::JSGet
```javascript
console.info("name " + g_object["name"]); //通路到的是組網内最新資料
```
3、總結
本文使用OpenHarmony 3.1版本的分布式資料對象子產品源碼,對子產品提供的JS接口進行了源碼級的介紹,對主要JS接口功能的實作流程也進行深入的介紹。分布式資料對象服務依賴于分布式資料管理,分布式軟總線,系統能力管理等子產品,感興趣的讀者可以研究下其他的子產品的原理,可以更加友善的了解分布式資料對象。
本文一些JS例子代碼來源于分布式對象使用者手冊。
更多原創内容請關注:深開鴻技術團隊
入門到精通、技巧到案例,系統化分享HarmonyOS開發技術,歡迎投稿和訂閱,讓我們一起攜手前行共建鴻蒙生态。