本文是参数服务器的第四篇,介绍KVWorker, KVServer。
目录
[源码解析]机器学习参数服务器ps-lite(4) ----- 应用节点实现
0x00 摘要
0x01 基础类
1.1 Range
1.2 TreadsafeQueue
0x02 SimpleApp
2.1 概述
2.2 定义
2.2.1 支撑类
2.2.2 成员变量
2.3 功能函数
0x03 KVServer
3.1 定义
3.2 功能函数
3.2.1 Response
3.2.2 Process
3.2.3 例子函数
3.2.4 流程
0x04 KVWorker
4.1 概述
4.2 定义
4.3 功能函数
4.3.1 Push & ZPush
4.3.2 Pull
4.3.3 ZPull
4.3.4 Send
4.3.5 DefaultSlicer
4.3.6 PushPull & ZPushPull
4.3.7 Callback 相关
4.3.7.1 设置
4.3.7.2 AddPullCB
4.3.7.3 运行
4.3.8 Process
0x05 总结
0xEE 个人信息
0xFF 参考
本系列其他文章是:
[源码解析] 机器学习参数服务器ps-lite 之(1) ----- PostOffice
[源码解析] 机器学习参数服务器ps-lite(2) ----- 通信模块Van
[源码解析] 机器学习参数服务器ps-lite 之(3) ----- 代理人Customer
KVWorker, KVServer这两个分别是 Server / Worker 节点的抽象,是被 Van ---> Customer ---> recv_handle_ 来作为引擎的一部分来启动的。
本文会先介绍一些基础支撑类,然后介绍 Server / Worker的基类 SimpleApp,最后介绍 Server / Worker 的具体实现。
总体流程图提前剧透如下:

我们首先需要介绍一些基础类。
Range 类作用是:根据这个Range确定要拉取的参数在哪个server上,以及一个server对应的key的range。
Range 类提供如下函数:
begin()和end()两个uint64的位置;
size() 获得 本 range 的大小,即 end_ - begin_;
TreadsafeQueue 是一个可以供多个线程读取的队列,通过锁和条件量合作来达到线程安全,用来做消息队列。
SimpleApp是一个基类,把应用节点功能做了一个统一抽象。
提供了基本发送功能和简单消息处理函数(Request, Wait, Response)。
消息类型为:int型的head和string型的body。
它有2个派生类。KVServer和KVWorker。
SimpleData 定义了 Request 和 Response 的基本格式。
SimpleApp 主要有如下成员变量:
Customer* obj_ :本 App 的 Customer,控制请求连接;
Handle request_handle_ :request 处理函数;
Handle response_handle_ :response 处理函数;
set_request_handle,set_response_handle:设置成员<code>request_handle_</code>, <code>response_handle_</code>。在客户端调用SimpleApp::Process时,根据message.meta中的指示变量判断是request还是response,调用相应handle处理;
三个简单功能函数如下:
Request 就是调用 Van 发送消息。
Response 是调用 Van 回复消息。
Process 函数根据message.meta中的指示变量判断是request还是response,调用相应handle处理。
KVServer 是 Server 节点的抽象,其作用是 接收信息、处理信息、返回结果三个步骤,主要功能是:
维护 key-value pairs 数据;
处理 & 应答 客户端的 push & pull 请求;
函数<code>request_handle_</code> 处理请求:
在调用KVServer::Process时 会调用到 <code>request_handle_</code> 。
<code>request_handle_</code>默认为<code>KVServerDefaultHandle</code>。
函数<code>Response</code>用于返回数据;
request_handle_ 是 request 处理函数,需要自定义。
在该回调函数中使用者则需要实现各种优化器的的模型权重梯度更新算法和模型权重返回操作。
可直接参考ps-lite已实现的默认版本KVServerDefaultHandle。
<code>Response()</code>就是向调用的worker发送 response 信息。与SimpleApp 比较下,发现 KVServer 这里对于 head 和 body 都有了新的处理。
需要注意的是:Response 函数应该是被用户自定义的 <code>request_handle_</code> 调用,即 <code>request_handle_</code> 处理收到的消息,然后调用 Response 对 worker 进行回复应答。
<code>Process()</code>被注册到Customer对象中,当Customer对象的receiving thread接受到消息时,就调用<code>Process()</code>对数据进行处理。
<code>Process()</code>内部的逻辑是:
提取消息的元信息,构建一个 KVMeta。
可以看到,在 Process 中没有对 KV 数据的维护。
Process 调用 用户自行实现的一个request_handle_ (std::function函数对象)对数据进行处理。
在回调函数 request_handle_ 中使用者则需要实现各种优化器的的模型权重梯度更新算法和模型权重返回操作。
KVServerDefaultHandle 是 ps-lite 提供的例子,用于演示如何维护 KV,处理消息,返回请求。
这里维护一个哈希表 unordered_map,记录key和value,并对push和pull请求进行响应。
使用std::unordered_map store保存server的参数,当请求为push时,对store参数做更新,请求为pull时对参数进行拉取;
我们接着上文继续梳理细化流程。
worker节点 或者 server节点 在程序的最开始会执行<code>Postoffice::start()</code>。
<code>Postoffice::start()</code>会初始化节点信息,并且调用<code>Van::start()</code>。
每个节点都监听了本地一个端口;该连接的节点在启动时已经连接。
<code>Van::start()</code> 启动一个本地线程专门接收socket的信息,使用<code>Van::Receiving()</code>来持续监听收到的message。
<code>Van::Receiving()</code>接收后消息之后,根据不同命令执行不同动作。针对数据消息,如果需要下一步处理,会调用 ProcessDataMsg:
依据消息中的app id找到 Customer(每个app 任务会绑定一个custom类),即会根据customer id的不同将message发给不同的customer的recv thread。
将消息传递给<code>Customer::Accept</code>函数。
Customer::Accept() 函数将消息添加到一个队列<code>recv_queue_</code>;
Customer 对象本身也会启动一个接受线程 <code>recv_thread_</code>,使用 Customer::Receiving() :
不断的从<code>recv_queue_</code>队列取消息。
如果 (!recv.meta.request) ,就说明是 response,则<code>tracker_[req.timestamp].second++</code>
调用注册的用户自定义的<code>recv_handle_</code>函数对消息进行处理。
对于worker来说,其注册的<code>recv_handle_</code>是<code>KVWorker::Process()</code>函数。因为worker的recv thread接受到的消息主要是从server处pull下来的KV对,因此该<code>Process()</code>主要是接收message中的KV对;
而对于Server来说,其注册的<code>recv_handle_</code>是<code>KVServer::Process()</code>函数。
因为我们这里是 KVServer,而且server接受的是worker们push上来的KV对,需要对其进行处理,因此该<code>Process()</code>函数中调用的用户通过<code>KVServer::set_request_handle()</code>传入的函数对象。
在 用户自定义的 request_handle_ 函数中,如果需要发送 response 给 worker,则调用 KVServer::Response。
目前逻辑如下图,在 第 8 步,recv_handle_ 指向 KVServer::Process 或者 KVWorker::Process(本节是server,所以对应的是KVServer::Process)。在第10步,返回 response 给 worker。
KVWorker用于向server节点push,pull key-value对,就是在算法过程中,需要并行处理的各种参数。
Worker中的push和pull操作都是异步返回一个ID,然后使用ID进行wait阻塞等待,即同步操作。
或者异步调用时传入一个Callback进行后续操作。
KVWorker 主要变量为:
std::unordered_map<int, std::vector<KVPairs>> recv_kvs :收到的pull 结果: kv value ;
std::unordered_map<int, Callback> callbacks :收到 request 的所有 response 之后执行的回调函数;
Slicer slicer_ :默认 slice 函数变量,该函数在调用Send函数时,将KVPairs按照每个server的Range切片;
主要函数为:
ZPush 零拷贝push函数
ZPull 零拷贝pull函数
AddPullCB key重组函数
Process 消息处理函数
DefaultSlicer 默认的slice 处理函数
set_slicer:设置slicer_成员,该函数在调用Send函数时,将KVPairs按照每个server的Range切片;
因为 Push 调用了 ZPush,所以我们放在一起介绍。
Push方法主要就是:
把数据(KV列表)发送到对应的服务器节点;
KV列表是依据每个服务器维护的 Key range 来进行分区发送;
Push 是异步直接返回,如果想知道返回结果如何,则可以:
使用 Wait 来等待,即利用tracker_来记录发送的请求量和对应的响应请求量,当发送量等于接收量的时候,表示每个请求都成功发送了,以此来达到同步的目的;
使用 callback,这样当结束时候就可以回调到。
ZPush 方法是:
使用obj_(Customer类型)的 NewRequest 方法来记录记录发送的请求量和对应的响应请求量,并且返回一个时间戳;
设置好对应 timestamp 的 callback;
使用传入的参数构造KVPair对象,调用Send送出该对象;
如何调用可以参考其注释:
pull方法跟push的逻辑大体类似:
绑定一个回调函数,用于拷贝数据,并且得到一个时间戳。
根据key_vector从Server上拉取val_vector,
最终返回timestamp,
该函数不阻塞,可用worker.Wait(timestamp)等待;
逻辑与 Pull 一致,只是省略了拷贝到系统这个过程。因此需要保证在ZPull完成前,调用者没有改变key_vector;
<code>Push()</code>和<code>Pull()</code>最后都会调用<code>Send()</code>函数,<code>Send()</code>对KVPairs进行切分,因为每个Server只保留一部分参数,因此切分后的SlicedKVpairs就会被发送给不同的Server。
如果是 skipped,则会直接调用 callback。
否则遍历发送。
切分函数可以由用户自行重写,默认为<code>DefaultSlicer</code>,每个SlicedKVPairs被包装成Message对象,然后用<code>van::send()</code>发送。
根据<code>std::vector& ranges</code>分片范围信息,将要发送的数据进行分片。目前默认的使用 <code>Postoffice::GetServerKeyRanges</code>来划分分片范围。
就是把 push,pull 聚合在一起。
前面提到了一些回调函数的设置,下面我们看看如何使用。
我们可以看到,针对每个时间戳,设置了一个回调函数,进而构成了一个回调函数列表。
每次发送请求之后,都会往这个列表中注册回调函数。
这是 pull 之后,得到应答的回调函数,用于拷贝返回的数据。
但是,如果是多个 Server 都应该有返回,应该如何处理?无论是 push 还是 pull,只有在收到了所有的Response之后,才会将从各个server上拉取的value填入本地的<code>vals</code>里。
就是依据时间戳找到回调函数,运行,然后删除。
何时调用,就是在 Process 之中会调用,我们马上介绍。
如果是 Pull 的 response, 在每次收到的Response返回的values,会先保存<code>recv_kvs_</code>里,<code>recv_kvs_[ts].push_back(kvs);</code>
无论是 push 还是 pull,只有在收到了所有的Response之后,才会将从各个server上拉取的value填入本地的<code>vals</code>里。
最后我们用一个消息传递流程做一下总结,看看各个部分在其中如何使用。总体流程图如下:
worker节点 要发送消息,所以调用了 Send 方法。
Send 方法会调用到了 Customer的 NewRequest,来建立一个新请求。
Send方法会调用 Van 的 send 方法来进行网络交互。
经过网络传递之后,流程来到了 Server 处,对于 Server 来说,这是一个 Request,调用到了 Van 的 Receiving。<code>Van::Receiving()</code>接收后消息之后,根据不同命令执行不同动作。针对数据消息,如果需要下一步处理,会调用 ProcessDataMsg。
继续调用到 Van 的 ProcessDataMsg,然后调用 GetCustomer。
GetCustomer 会调用到Postoffice,对于 customers_ 进行相应处理。
Customer 会使用 Accept 来处理消息。
Customer::Accept() 函数将消息添加到一个队列<code>recv_queue_</code>。
Customer 对象本身也会启动一个接受线程 <code>recv_thread_</code>,使用 Customer::Receiving() :
<code>Van::Receiving()</code> 调用注册的用户自定义的<code>recv_handle_</code>函数对消息进行处理。
对于Server来说,其注册的<code>recv_handle_</code>是<code>KVServer::Process()</code>函数。
Process 函数调用 request_handle_ 继续处理,生成 Response,返回给 Worker。
Response 经过网络传递给 Worker。
运行回到了 Worker,来到了 Worker 的 Van。对于 worker 来说,这是一个 Request,调用到了 Van 的 Receiving。(以下操作序列和 Server 类似)
<code>Van::Receiving()</code>接收后消息之后,根据不同命令执行不同动作。针对数据消息,如果需要下一步处理,会调用 ProcessDataMsg。
这里有个解耦合,由一个新线程 <code>recv_thread_</code>处理。
Customer 对象本身已经启动一个新线程 <code>recv_thread_</code>,使用 Customer::Receiving() 。
对于Worker来说,其注册的<code>recv_handle_</code>是<code>KVWorker::Process()</code>函数。
调用到<code>KVWorker::Process()</code>函数处理响应消息Response。
手机如下:

★★★★★★关于生活和技术的思考★★★★★★
微信公众账号:罗西的思考
如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,敬请关注。
史上最全面的ps-lite理解
从零实现机器学习参数服务器框架(二)