天天看點

[源碼解析]機器學習參數伺服器ps-lite(4) ----- 應用節點實作

本文是參數伺服器的第四篇,介紹KVWorker, KVServer。

目錄

[源碼解析]機器學習參數伺服器ps-lite(4) ----- 應用節點實作

0x00 摘要

0x01 基礎類

1.1 Range

1.2 TreadsafeQueue

0x02 SimpleApp

2.1 概述

2.2 定義

2.2.1 支撐類

2.2.2 成員變量

2.3 功能函數

0x03 KVServer

3.1 定義

3.2 功能函數

3.2.1 Response

3.2.2 Process

3.2.3 例子函數

3.2.4 流程

0x04 KVWorker

4.1 概述

4.2 定義

4.3 功能函數

4.3.1 Push & ZPush

4.3.2 Pull

4.3.3 ZPull

4.3.4 Send

4.3.5 DefaultSlicer

4.3.6 PushPull & ZPushPull

4.3.7 Callback 相關

4.3.7.1 設定

4.3.7.2 AddPullCB

4.3.7.3 運作

4.3.8 Process

0x05 總結

0xEE 個人資訊

0xFF 參考

本系列其他文章是:

[源碼解析] 機器學習參數伺服器ps-lite 之(1) ----- PostOffice

[源碼解析] 機器學習參數伺服器ps-lite(2) ----- 通信子產品Van

[源碼解析] 機器學習參數伺服器ps-lite 之(3) ----- 代理人Customer

KVWorker, KVServer這兩個分别是 Server / Worker 節點的抽象,是被 Van ---> Customer ---> recv_handle_ 來作為引擎的一部分來啟動的。

本文會先介紹一些基礎支撐類,然後介紹 Server / Worker的基類 SimpleApp,最後介紹 Server / Worker 的具體實作。

總體流程圖提前劇透如下:

[源碼解析]機器學習參數伺服器ps-lite(4) ----- 應用節點實作

我們首先需要介紹一些基礎類。

Range 類作用是:根據這個Range确定要拉取的參數在哪個server上,以及一個server對應的key的range。

Range 類提供如下函數:

begin()和end()兩個uint64的位置;

size() 獲得 本 range 的大小,即 end_ - begin_;

TreadsafeQueue 是一個可以供多個線程讀取的隊列,通過鎖和條件量合作來達到線程安全,用來做消息隊列。

SimpleApp是一個基類,把應用節點功能做了一個統一抽象。

提供了基本發送功能和簡單消息處理函數(Request, Wait, Response)。

消息類型為:int型的head和string型的body。

它有2個派生類。KVServer和KVWorker。

SimpleData 定義了 Request 和 Response 的基本格式。

SimpleApp 主要有如下成員變量:

Customer* obj_ :本 App 的 Customer,控制請求連接配接;

Handle request_handle_ :request 處理函數;

Handle response_handle_ :response 處理函數;

set_request_handle,set_response_handle:設定成員<code>request_handle_</code>, <code>response_handle_</code>。在用戶端調用SimpleApp::Process時,根據message.meta中的訓示變量判斷是request還是response,調用相應handle處理;

三個簡單功能函數如下:

Request 就是調用 Van 發送消息。

Response 是調用 Van 回複消息。

Process 函數根據message.meta中的訓示變量判斷是request還是response,調用相應handle處理。

KVServer 是 Server 節點的抽象,其作用是 接收資訊、處理資訊、傳回結果三個步驟,主要功能是:

維護 key-value pairs 資料;

處理 &amp; 應答 用戶端的 push &amp; pull 請求;

函數<code>request_handle_</code> 處理請求:

在調用KVServer::Process時 會調用到 <code>request_handle_</code> 。

<code>request_handle_</code>預設為<code>KVServerDefaultHandle</code>。

函數<code>Response</code>用于傳回資料;

request_handle_ 是 request 處理函數,需要自定義。

在該回調函數中使用者則需要實作各種優化器的的模型權重梯度更新算法和模型權重傳回操作。

可直接參考ps-lite已實作的預設版本KVServerDefaultHandle。

<code>Response()</code>就是向調用的worker發送 response 資訊。與SimpleApp 比較下,發現 KVServer 這裡對于 head 和 body 都有了新的處理。

需要注意的是:Response 函數應該是被使用者自定義的 <code>request_handle_</code> 調用,即 <code>request_handle_</code> 處理收到的消息,然後調用 Response 對 worker 進行回複應答。

<code>Process()</code>被注冊到Customer對象中,當Customer對象的receiving thread接受到消息時,就調用<code>Process()</code>對資料進行處理。

<code>Process()</code>内部的邏輯是:

提取消息的元資訊,建構一個 KVMeta。

可以看到,在 Process 中沒有對 KV 資料的維護。

Process 調用 使用者自行實作的一個request_handle_ (std::function函數對象)對資料進行處理。

在回調函數 request_handle_ 中使用者則需要實作各種優化器的的模型權重梯度更新算法和模型權重傳回操作。

KVServerDefaultHandle 是 ps-lite 提供的例子,用于示範如何維護 KV,處理消息,傳回請求。

這裡維護一個哈希表 unordered_map,記錄key和value,并對push和pull請求進行響應。

使用std::unordered_map store儲存server的參數,當請求為push時,對store參數做更新,請求為pull時對參數進行拉取;

我們接着上文繼續梳理細化流程。

worker節點 或者 server節點 在程式的最開始會執行<code>Postoffice::start()</code>。

<code>Postoffice::start()</code>會初始化節點資訊,并且調用<code>Van::start()</code>。

每個節點都監聽了本地一個端口;該連接配接的節點在啟動時已經連接配接。

<code>Van::start()</code> 啟動一個本地線程專門接收socket的資訊,使用<code>Van::Receiving()</code>來持續監聽收到的message。

<code>Van::Receiving()</code>接收後消息之後,根據不同指令執行不同動作。針對資料消息,如果需要下一步處理,會調用 ProcessDataMsg:

依據消息中的app id找到 Customer(每個app 任務會綁定一個custom類),即會根據customer id的不同将message發給不同的customer的recv thread。

将消息傳遞給<code>Customer::Accept</code>函數。

Customer::Accept() 函數将消息添加到一個隊列<code>recv_queue_</code>;

Customer 對象本身也會啟動一個接受線程 <code>recv_thread_</code>,使用 Customer::Receiving() :

不斷的從<code>recv_queue_</code>隊列取消息。

如果 (!recv.meta.request) ,就說明是 response,則<code>tracker_[req.timestamp].second++</code>

調用注冊的使用者自定義的<code>recv_handle_</code>函數對消息進行處理。

對于worker來說,其注冊的<code>recv_handle_</code>是<code>KVWorker::Process()</code>函數。因為worker的recv thread接受到的消息主要是從server處pull下來的KV對,是以該<code>Process()</code>主要是接收message中的KV對;

而對于Server來說,其注冊的<code>recv_handle_</code>是<code>KVServer::Process()</code>函數。

因為我們這裡是 KVServer,而且server接受的是worker們push上來的KV對,需要對其進行處理,是以該<code>Process()</code>函數中調用的使用者通過<code>KVServer::set_request_handle()</code>傳入的函數對象。

在 使用者自定義的 request_handle_ 函數中,如果需要發送 response 給 worker,則調用 KVServer::Response。

目前邏輯如下圖,在 第 8 步,recv_handle_ 指向 KVServer::Process 或者 KVWorker::Process(本節是server,是以對應的是KVServer::Process)。在第10步,傳回 response 給 worker。

KVWorker用于向server節點push,pull key-value對,就是在算法過程中,需要并行處理的各種參數。

Worker中的push和pull操作都是異步傳回一個ID,然後使用ID進行wait阻塞等待,即同步操作。

或者異步調用時傳入一個Callback進行後續操作。

KVWorker 主要變量為:

std::unordered_map&lt;int, std::vector&lt;KVPairs&gt;&gt; recv_kvs :收到的pull 結果: kv value ;

std::unordered_map&lt;int, Callback&gt; callbacks :收到 request 的所有 response 之後執行的回調函數;

Slicer slicer_ :預設 slice 函數變量,該函數在調用Send函數時,将KVPairs按照每個server的Range切片;

主要函數為:

ZPush 零拷貝push函數

ZPull 零拷貝pull函數

AddPullCB key重組函數

Process 消息處理函數

DefaultSlicer 預設的slice 處理函數

set_slicer:設定slicer_成員,該函數在調用Send函數時,将KVPairs按照每個server的Range切片;

因為 Push 調用了 ZPush,是以我們放在一起介紹。

Push方法主要就是:

把資料(KV清單)發送到對應的伺服器節點;

KV清單是依據每個伺服器維護的 Key range 來進行分區發送;

Push 是異步直接傳回,如果想知道傳回結果如何,則可以:

使用 Wait 來等待,即利用tracker_來記錄發送的請求量和對應的響應請求量,當發送量等于接收量的時候,表示每個請求都成功發送了,以此來達到同步的目的;

使用 callback,這樣當結束時候就可以回調到。

ZPush 方法是:

使用obj_(Customer類型)的 NewRequest 方法來記錄記錄發送的請求量和對應的響應請求量,并且傳回一個時間戳;

設定好對應 timestamp 的 callback;

使用傳入的參數構造KVPair對象,調用Send送出該對象;

如何調用可以參考其注釋:

pull方法跟push的邏輯大體類似:

綁定一個回調函數,用于拷貝資料,并且得到一個時間戳。

根據key_vector從Server上拉取val_vector,

最終傳回timestamp,

該函數不阻塞,可用worker.Wait(timestamp)等待;

邏輯與 Pull 一緻,隻是省略了拷貝到系統這個過程。是以需要保證在ZPull完成前,調用者沒有改變key_vector;

<code>Push()</code>和<code>Pull()</code>最後都會調用<code>Send()</code>函數,<code>Send()</code>對KVPairs進行切分,因為每個Server隻保留一部分參數,是以切分後的SlicedKVpairs就會被發送給不同的Server。

如果是 skipped,則會直接調用 callback。

否則周遊發送。

切分函數可以由使用者自行重寫,預設為<code>DefaultSlicer</code>,每個SlicedKVPairs被包裝成Message對象,然後用<code>van::send()</code>發送。

根據<code>std::vector&amp; ranges</code>分片範圍資訊,将要發送的資料進行分片。目前預設的使用 <code>Postoffice::GetServerKeyRanges</code>來劃分分片範圍。

就是把 push,pull 聚合在一起。

前面提到了一些回調函數的設定,下面我們看看如何使用。

我們可以看到,針對每個時間戳,設定了一個回調函數,進而構成了一個回調函數清單。

每次發送請求之後,都會往這個清單中注冊回調函數。

這是 pull 之後,得到應答的回調函數,用于拷貝傳回的資料。

但是,如果是多個 Server 都應該有傳回,應該如何處理?無論是 push 還是 pull,隻有在收到了所有的Response之後,才會将從各個server上拉取的value填入本地的<code>vals</code>裡。

就是依據時間戳找到回調函數,運作,然後删除。

何時調用,就是在 Process 之中會調用,我們馬上介紹。

如果是 Pull 的 response, 在每次收到的Response傳回的values,會先儲存<code>recv_kvs_</code>裡,<code>recv_kvs_[ts].push_back(kvs);</code>

無論是 push 還是 pull,隻有在收到了所有的Response之後,才會将從各個server上拉取的value填入本地的<code>vals</code>裡。

最後我們用一個消息傳遞流程做一下總結,看看各個部分在其中如何使用。總體流程圖如下:

worker節點 要發送消息,是以調用了 Send 方法。

Send 方法會調用到了 Customer的 NewRequest,來建立一個新請求。

Send方法會調用 Van 的 send 方法來進行網絡互動。

經過網絡傳遞之後,流程來到了 Server 處,對于 Server 來說,這是一個 Request,調用到了 Van 的 Receiving。<code>Van::Receiving()</code>接收後消息之後,根據不同指令執行不同動作。針對資料消息,如果需要下一步處理,會調用 ProcessDataMsg。

繼續調用到 Van 的 ProcessDataMsg,然後調用 GetCustomer。

GetCustomer 會調用到Postoffice,對于 customers_ 進行相應處理。

Customer 會使用 Accept 來處理消息。

Customer::Accept() 函數将消息添加到一個隊列<code>recv_queue_</code>。

Customer 對象本身也會啟動一個接受線程 <code>recv_thread_</code>,使用 Customer::Receiving() :

<code>Van::Receiving()</code> 調用注冊的使用者自定義的<code>recv_handle_</code>函數對消息進行處理。

對于Server來說,其注冊的<code>recv_handle_</code>是<code>KVServer::Process()</code>函數。

Process 函數調用 request_handle_ 繼續處理,生成 Response,傳回給 Worker。

Response 經過網絡傳遞給 Worker。

運作回到了 Worker,來到了 Worker 的 Van。對于 worker 來說,這是一個 Request,調用到了 Van 的 Receiving。(以下操作序列和 Server 類似)

<code>Van::Receiving()</code>接收後消息之後,根據不同指令執行不同動作。針對資料消息,如果需要下一步處理,會調用 ProcessDataMsg。

這裡有個解耦合,由一個新線程 <code>recv_thread_</code>處理。

Customer 對象本身已經啟動一個新線程 <code>recv_thread_</code>,使用 Customer::Receiving() 。

對于Worker來說,其注冊的<code>recv_handle_</code>是<code>KVWorker::Process()</code>函數。

調用到<code>KVWorker::Process()</code>函數處理響應消息Response。

手機如下:

[源碼解析]機器學習參數伺服器ps-lite(4) ----- 應用節點實作

★★★★★★關于生活和技術的思考★★★★★★

微信公衆賬号:羅西的思考

如果您想及時得到個人撰寫文章的消息推送,或者想看看個人推薦的技術資料,敬請關注。

[源碼解析]機器學習參數伺服器ps-lite(4) ----- 應用節點實作

史上最全面的ps-lite了解

從零實作機器學習參數伺服器架構(二)