天天看点

鸿蒙子系统解读-分布式任务调度篇

鸿蒙子系统解读-分布式任务调度篇

本文作者:江苏润和软件股份有限公司 郎建中

1.总体描述

1.1.总体介绍

分布式任务调度基于分布式软总线、分布式数据管理、分布式Profile等技术特性,构建统一的分布式服务管理(发现、同步、注册、调用)机制,支持对跨设备的应用进行远程启动、远程调用、远程连接以及迁移等操作,能够根据不同设备的能力、位置、业务运行状态、资源使用情况,以及用户的习惯和意图,选择合适的设备运行分布式任务。

下图是分布式调度子系统在整个鸿蒙系统中的位置:

鸿蒙子系统解读-分布式任务调度篇

下图表示分布式调度的示意图:

鸿蒙子系统解读-分布式任务调度篇

从A设备的某个FA(Feature Ability代表有界面的元能力)应用调用设备B上的FA应用。这里的调用的含义包含了:

a、启动和关闭:启动和关闭远程设备上的ability(包括:基于Page的ability、基于Service的ability、基于Data模板的ability)

b、连接和断开:向开发者提供跨设备控制服务的能力。这里的服务表示:基于Server和Data模板的ability。

c、迁移能力:向开发者提供跨设备的业务无缝迁移能力。开发者可以通过基于Page的ability的迁移接口,将本地的业务迁移到指定的设备中。

1.2.分布式调度中的两种设备

在分布式调度中,存在两个角色。按照上图有设备A和设备B。一般来说设备A是指智慧屏设备,设备B只一般的轻量设备。智慧屏设备一般指智能TV、手机等。轻量设备一般只Camera、手表等

下面图示表示这两种设备的系统架构图:

鸿蒙子系统解读-分布式任务调度篇

(约束:如果要实现分布式调度,目前智慧屏设备和轻量设备必须处于同一个局域网段内)

从鸿蒙系统的整体系统框架图可以看出,分布式调度子系统及周边的依赖模块如下图:

鸿蒙子系统解读-分布式任务调度篇

1.3.分布式调度代码示例–启动远程FA

1.3.1.智慧屏上的代码示例

1.获取目标在线从设备的设备ID

// 引入设备选择头文件import ohos.distributedschedule.interwork.DeviceInfo;import ohos.distributedschedule.interwork.DeviceManager;// 获取在线设备列表List<DeviceInfo> deviceInfoListOnline = DeviceManager.getDmsDeviceList(DeviceInfo.FLAG_GET_ONLINE_DEVICE);String remote_device_id;if (deviceInfoListOnline.size() > 0){
    remote_device_id = deviceInfoListOnline[0].GetDeviceId(); // 获取在线列表中第一台设备的设备ID}12345678910111213      

2.构造want,首先使用ElementName类表明需要启动的远端设备ID,包名,元能力类名,传入want中,然后设置want中的分布式标志位Want.FLAG_ABILITYSLICE_MULTI_DEVICE表示需要远程启动

// 引入相关头文件import ohos.aafwk.ability.Ability;import ohos.aafwk.content.Want;import ohos.bundle.ElementName;// 启动远程设备FAWant want = new Want(); // 封装启动远端FA的Want// 使用步骤2中获取的设备ID,并指定FA信息ElementName name = new ElementName(remote_device_id, "com.huawei.remote_package_name", "remote_class_name"); want.setElement(name); // 将待启动的FA信息添加到Want中want.setFlags(Want.FLAG_ABILITYSLICE_MULTI_DEVICE); // 设置分布式标记,若不设置将无法使用分布式能力startAbility(want); // 按照Want启动指定FA,Want参数命名以实际开发平台API为准123456789101112131415      

1.3.2.轻量设备上的代码示例

轻量设备上代码可以参考鸿蒙Java的API参考手册中,如何创建基于Page的Ability。

https://developer.harmonyos.com/cn/docs/documentation/doc-guides/ability-page-concepts-0000000000033573

2.代码目录结构

分布式调度的代码在foundation/distributedschedule目录中,目录结构如下:

鸿蒙子系统解读-分布式任务调度篇

其中interfaces中包含了所有的头文件,如下:

鸿蒙子系统解读-分布式任务调度篇

Services目录下包含了如下目录:

鸿蒙子系统解读-分布式任务调度篇

其中,dtbschedmgr_lite目录是轻量级分布式调度模块代码。

鸿蒙子系统解读-分布式任务调度篇

safwk_lite目录中是foundation 这个bin文件的main函数,用于samgr启动,初始化所有注册的Service。

samgr_lite目录是系统服务框架子系统代码,这个目录是系统的基础系统服务框架代码,分布式调度子系统也是一个系统服务,将会注册在samgr里面。并且依赖系统服务框架进行服务的发布,注册等功能。

3.代码分析

3.1.轻量设备子系统代码分析

轻量设备分布式调度子系统的源码主要是以下的7个文件:

1.distributed_schedule_service.c 分布式调度对外接口

2.dmslite.c 分布式调度服务实现

3.dmslite_check_remote_permission.c 分布式调度权限管理模块

4.dmslite_famgr.c 分布式调度FA管理模块

5.dmslite_msg_parser.c 分布式消息解析模块

6.dmslite_session.c 跨设备通信收发模块

7.dmslite_tlv_common.c TLV格式数据解析模块

下面我们将分三个过程来分析源码:1、分布式调度服务的初始化。2、协议报文的接收和解析。3、轻量设备端拉起FA。

3.1.1.分布式调度服务初始化

分布式调度的服务和特性定义和初始化在distributed_schedule_service.c和dmslite.c文件中。

1.服务和特性的的定义和注册

服务的定义和初始化

服务的定义和初始化在distributed_schedule_service.c中。下面的代码定义了全局唯一的服务对象(用C语言实现了C++类的概念)

鸿蒙子系统解读-分布式任务调度篇

g_distributedService 就是全局唯一的服务对象结构体,而DistributedService定义继承了INHERIT_SERVICE,这是所有服务都必须继承的。

鸿蒙子系统解读-分布式任务调度篇
鸿蒙子系统解读-分布式任务调度篇

从上面的代码可以看出,所有的服务都必须要有4个成员。

GetName()成员就是让samgr可以得到这个服务的名称。

Initialize()成员就是服务的初始化过程,在samgr的SAMGR_Bootstrap()函数中会调用所有注册服务的初始化过程。

MessageHandle()成员是服务对外的消息处理函数。

GetTaskConfig()成员是向samgr上报服务的基本配置,包括:level,priority, stackSize,queueSize,taskFlag。

最后我们在distributed_schedule_service.c文件中看到如下的初始化定义:

鸿蒙子系统解读-分布式任务调度篇

上面的这段代码首先用SYS_SERVICE_INIT宏定义了分布式调度服务的初始化函数。这个函数被写入zinitcall这个数据段中(或者通过__attribute__((constructor)) 定义在bin文件的初始化过程中)。所以由SYS_SERVICE_INIT宏定义的函数都会在main函数之前被执行。

鸿蒙子系统解读-分布式任务调度篇

因此,在main()函数执行前,Init()(注意是distributed_schedule_service.c中的)会先被执行。我们看到在Init函数中,调用了SAMGR_GetInstance()->RegisterService((Service *)&g_distributedService); 注册了分布式调度子系统服务。下面我们在服务的初始化中介绍这个函数调用。

特性的定义和初始化

特性的定义在dmslite.c 中。如下代码:

鸿蒙子系统解读-分布式任务调度篇

g_dmslite是全局唯一的特性对象。DmsLite的定义如下:

鸿蒙子系统解读-分布式任务调度篇

INHERIT_FEATURE宏定义了所有的特性都要有的成员(也就是C++中继承的概念)

鸿蒙子系统解读-分布式任务调度篇

GetName:返回特性的字符串名称。

OnInitialize:特性的初始化函数。下面的初始化流程中有介绍。

OnStop:特性终止时调用。

OnMessage:特性的消息处理函数。用户可以通过IUnknown接口发送消息。

在dmslite.c中有如下的特性初始化定义:

鸿蒙子系统解读-分布式任务调度篇

SYS_FEATURE_INIT宏与SYS_SERVICE_INIT宏类似,会在main函数调用前被调用。这个宏用来注册特性的初始化函数入口Init(注意:是dmslite.c中的Init()函数)。

Init()函数调用SAMGR_GetInstance()取得系统服务框架子系统的全局唯一对象,然后调用RegisterFeature来注册Feature和FeatureApi。

2.服务的初始化

这个分布式调度子系统的初始化过程是在系统服务框架子系统中完成。上面说过在foundation/distributedschedule/services目录下有三个子目录,分别是samgr_lite、safwk_lite、dtbschedmgr_lite。其中safwk_lite目录中只包含了一个main.c文件,这个文件就是系统服务框架子系统的主入口。而samgr_lit和dtbschedmgr_lite目录下的源码将分别编译出库文件,然后跟safwk_lite下的main.c一起编译成foundation 这个bin文件。

我们先来看一下main.c这个文件。

鸿蒙子系统解读-分布式任务调度篇

从上面的代码可以看到,main()函数调用了 SAMGR_Bootstrap()后,进入了一个死循环。这个函数定义在samgr_lite/samgr/source/samgr_lite.c中,我们来看看这个函数。

鸿蒙子系统解读-分布式任务调度篇

这个函数的实现大体上看分为3个部分:

获取全局唯一的samgr系统服务对象

鸿蒙子系统解读-分布式任务调度篇

g_samgrImpl 是samgr的全局唯一对象,我们来看看这个对象这么定义和初始化的。

鸿蒙子系统解读-分布式任务调度篇
鸿蒙子系统解读-分布式任务调度篇

从上面的初始化函数Init可以看到,g_samgrImpl被赋值了许多函数指针,其中RegisterService就是给其他服务调用注册服务用的,而RegisterFeature、RegisterFeatureApi就是给服务注册特性用的。

那这个Init()什么时候被调用呢?经过搜索,我们发现是在SAMGR_GetInstance()函数中被调用,而前面我们说的分布式调度初始化过程的Init()函数中就会调用这个函数。我们看看这个函数的实现。

鸿蒙子系统解读-分布式任务调度篇

这个函数首先通过判断 g_samgrImpl.mutex 这个变量是否初始化过。如果没有,那么调用Init来初始化g_samgrImpl这个全局对象,然后返回g_samgrImpl这个全局对象中的函数指针表,这个指针表中就包含了注册服务、注册特性等函数。(这里有个疑问:这个函数一会直接使用g_samgrImpl全局变量,一会使用GetImplement()函数间接的使用这个全局变量,不知道为什么?)

我们的分布式调度子系统的服务注册初始化函数Init(注意是distributed_schedule_service.c中的)在调用了SAMGR_GetInstance()后,得到的函数指针表,然后调用了RegisterService注册服务对象,我们来看看RegisterService()干什么了。

鸿蒙子系统解读-分布式任务调度篇

我们看到RegisterService()函数大致也分为三个部分:

A、 查看需要注册的service是否已经注册过了。

B、 根据要注册的sevice对象构造一个serviceImpl对象。serviceImpl对象的结构定义如下:

鸿蒙子系统解读-分布式任务调度篇

上面我们可以看出:

service:就是我们自己的服务对象,也就是分布式调度的全局唯一的服务对象g_distributedService(前面有过介绍)

defaultApi: 是默认的IUnknown对象指针

taskPool: 是这个服务的任务池对象

features: 是这个服务的特性列表,使用SYS_FEATURE_INIT宏可以初始化一个服务的特性

serviceId: 这个应该是服务的编号

ops: 这个应该是服务的操作消息结构

C、 将serviceImpl对象加入到系统g_samgrImpl的services列表中。

将注册的服务加入到临时变量 initServices 这个Vector类型的变量中

这个部分就是把前面说的g_samrImpl中的services列表中的服务都加入到InitServices中。

根据initServices中的服务,初始化所有的注册服务,这里也包含了我们的分布式调度服务

这一步就是调用InitializeAllServices(&initServices)函数,我们来看下这个函数的实现。

鸿蒙子系统解读-分布式任务调度篇

这个函数大致分为两个部分:

A、轮询每个注册的服务,初始化taskPool,并且调用InitializeSingleService()函数。

我们看下InitializeSingleService()函数的调用过程:

鸿蒙子系统解读-分布式任务调度篇
鸿蒙子系统解读-分布式任务调度篇
鸿蒙子系统解读-分布式任务调度篇

最终调用到了DEFAULT_Initialize()函数。这个函数先执行impl->service->Initialize(impl->service, id);这里的impl->service->Initialize就是我们的分布式调度子系统服务对象里面定义的Initialize函数,前面有过介绍。

DEFAULT_Initialize()函数中下面的红框中的代码就是为注册的Features进行初始化的地方。这里会调用Feature注册的OnInitialize()函数(dmslite.c中),这个函数的实现如下:

鸿蒙子系统解读-分布式任务调度篇

这里PublicService是在分布式软总线(./foundation/communication/services/softbus_lite/discovery/discovery_service/source/discovery_service.c)中实现的函数,功能是在软总线中发布。

g_publishInfo全局变量是发布的信息:

鸿蒙子系统解读-分布式任务调度篇

g_publishCallback全局变量是发布的回调函数结构体:

鸿蒙子系统解读-分布式任务调度篇

我们看一下OnPublishSuccess这个函数,是在发布成功后的回调。

鸿蒙子系统解读-分布式任务调度篇
鸿蒙子系统解读-分布式任务调度篇

发布成功后调用RegisterTcpCallback来创建一个TCP的Session会话服务器,用于解释TCP报文。这里的g_sessionCallback定义如下:

鸿蒙子系统解读-分布式任务调度篇

从结构体的定义可以看出:

onBytesReceived:就是当tcp消息到达后用于分析消息的函数。

OnSessionOpened:就是在远程Session打开后(也就是智慧屏端连接启动一个Session会话后的初始化函数)。

OnSessionClosed:就是会话关闭后的处理函数。

B、轮询每个注册的服务,启动taskPool。

总结:

1、分布式调度初始化过程代码在:distributed_schedule_service.c、dmslite.c中

2、涉及模块:

a) softbus_lite:./foundation/communication/services/softbus_lite

3、分布式调度的初始化过程时序图:

鸿蒙子系统解读-分布式任务调度篇

3.1.2.协议报文的接受和解析

前面在初始化过程中,我们介绍过在轻量设备分布式调度的Feature初始化后,会在软总线发布。发布成功后,调用软总线的CreateSessionServer()注册协议处理的回调函数。回调函数总共有3个,分别如下:

鸿蒙子系统解读-分布式任务调度篇

我们先看一下OnSessionOpened()和OnSessionClosed(),代码如下:

鸿蒙子系统解读-分布式任务调度篇

上面代码显示在会话打开和关闭的时候,基本什么都没做,只是打印了log。这里的会话我理解是智慧屏端设备通过软总线与轻量设备建立的会话。

下面我们分析下最核心的函数OnBytesReceived()。这个函数应该是在会话有数据传输后被软总线回调的。

鸿蒙子系统解读-分布式任务调度篇

在OnBytesReceived()函数中,先用参数data和 dataLen构造了临时对象interInfo。这个对象中持有了软总线接收到的会话数据和长度,然后调用了DmsLiteProcessCommuMsg() 函数处理会话数据,并且设置了回调函数。回调函数如下定义:

鸿蒙子系统解读-分布式任务调度篇

这里的onStartAbilityDone()应该就是完成拉起上层应用的FA后的回调。在下一节中我们详细介绍。

我们继续看DmsLiteProcessCommuMsg()函数的实现,如下:

鸿蒙子系统解读-分布式任务调度篇

我们把DmsLiteProcessCommuMsg()函数的实现分为三部分:

A、进行参数校验和进程校验。CanCall是用户校验,通过getuid()函数取得当前用户ID,并判断是否是合法的用户。这里应该是一个初步的安全校验。

B、对会话报文数据进行解码

这部分代码主要是先通过调用DecodeDmsTlv()函数解析会话报文数据,然后调用Feature的回调函数onTlvParseDone()(前面我们看到这个回调并没有设置,所以这里不会调用)。

DecodeDmsTlv()函数总共3个参数,前两个参数是会话报文的数据和长度,最后一个是输出结构体TlsDmsMsgInfo,定义如下:

鸿蒙子系统解读-分布式任务调度篇

commandId:智慧屏发送给轻量设备的命令字,例如:DMS_MSG_CMD_START_FA(目前的代码好像只定义了这一个命令字,后续应该会扩展)

calleeBundleName:轻量设备端接收命令的包名,也就是要拉起的FA的包名

calleeAbilityName:轻量设备端接收命令的Ability名,也就是要拉起FA后显示的Ability的名字

callerSignature:智慧屏设备端发起者的签名,用于安全检查

我们再来分析下DecodeDmsTlv()函数的实现,如下:

鸿蒙子系统解读-分布式任务调度篇

①调用TlvBytesToNode()将报文转化为Node数据格式,并且返回Node的头指针。这里不做详细代码分析了,有兴趣的同学可以自己研究下协议解析的代码。

②调用ReadTlvNode()读取Node的数据到输出结构dmsMsg。这里不做详细代码分析了,有兴趣的同学可以自己研究下协议解析的代码。

C、根据解码得到的命令字,执行相应的动作。这里从代码上看只支持一个动作:START_FA。下一节详细介绍拉起FA的过程。

1、会话数据的处理和协议的解析处理在dmslite_session.c,dmslite_msg_parser.c,dmslite_tlv_common.c中。

a)Softbus_lite:./foundation/communication/services/softbus_lite

3、会话数据处理和协议解析的时序图如下:

鸿蒙子系统解读-分布式任务调度篇