天天看點

【ceph 】ceph Dispatcher子產品分析

Dispatcher

dispatcher 是消息分發中心,所有收到的消息都經由該子產品,并由該子產品轉發給相應的處理子產品(moncliet、mdsclient、osd等)。

其實作方式比較簡單,就是把所有的子產品及其處理消息的方法 handle 注冊到分發中心,具體函數為 add_dispatcher_head/tail(),這樣就向 dispatcher_queue 中添加了指定子產品。後續在分發消息時,對 dispatcher_queue 進行輪詢,直到有一個處理子產品能夠處理該消息,通過 message->get_type() 來指定消息的處理函數。所有的消息分發都在 dispatcher 線程中完成。

在 add_dispatcher_head() 和 add_dispatcher_tail() 函數中,都做了 dispatcher 隊列是否為空的判斷(通過 dispatchers.empty() == true)。如果判定結果為空,說明需要重新建立 dispatcher 線程并綁定服務端位址,加入事件中心監聽端口,具體方法在 ready() 中。

void add_dispatcher_head(Dispatcher *d) {
bool first = dispatchers.empty();
dispatchers.push_front(d);
if (d->ms_can_fast_dispatch_any())
fast_dispatchers.push_front(d);
if (first)
ready();
  }
      

 add_dispatche_* 中調用了 AsyncMessenger::ready() 方法。下面給出AsyncMessenger::ready()方法代碼:

p->start()(Processor::start())方法中監聽 EVENT_READABLE 事件,并把事件送出到 EventCenter 事件中心,由上文介紹的 msgr-worker-x 線程去輪詢事件中心的隊列,監聽端口是否收到消息。收到的消息則由 dispatcher 線程分發給指定的處理程式,其分發消息的接口為 ms_dispatch() 和 ms_fast_dispatch()。

dispatch_queue.start() 中開啟了消息分發線程,分别為處理外部消息的 ms_dispatch 線程和處理本地消息的 ms_local 線程。相應的,它們有各自的優先級隊列(注意:分發消息的隊列時有優先級的,優先級越高,發送時機越早),分别是存儲外部消息的 mqueue 和本地消息隊列的 local_messages。消息隊列的添加方式也有兩種:mqueue.enqueue() 和 local_queue.emplace()。

void AsyncMessenger::ready()
{
ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;

stack->ready();
//綁定端口
if (pending_bind) {
int err = bindv(pending_bind_addrs);
if (err) {
lderr(cct) << __func__ << " postponed bind failed" << dendl;
ceph_abort();
    }
  }

Mutex::Locker l(lock);
//調用 worker 線程,監聽端口
for (auto &&p : processors)  
p->start();
//開啟 ms_dispatcher 和 ms_locla 線程
dispatch_queue.start();
}

void DispatchQueue::start()
{
ceph_assert(!stop);
ceph_assert(!dispatch_thread.is_started());
//開啟 ms_dispatch 和 ms_local 線程
dispatch_thread.create("ms_dispatch");
local_delivery_thread.create("ms_local");
}      

連結:https://www.jianshu.com/p/58956728dadc

============================================================

Dipatcher類是消息分發的接口,OSD、MON、等類都繼承該類,并實作了Dipatcher的消息分發接口

class OSD : public Dispatcher,public md_config_obs_t
{
/** OSD **/
}

class Monitor : public Dispatcher,public md_config_obs_t
{
public:

// me
string name;
}      
在OSD::init()函數中把不同類型的Dipatcher加入到SimpleMessenger執行個體中      
// i'm ready!
client_messenger->add_dispatcher_head(this);
cluster_messenger->add_dispatcher_head(this);
hbclient_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_front_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_back_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
objecter_messenger->add_dispatcher_head(service.objecter);      

在Messenger::add_dispatcher_head(Dispatcher *d)中加入Messenger::list<Dispatcher*> dispatchers中,并調用ready(),SimpleMessenger::ready()重寫了基類的ready,

void add_dispatcher_head(Dispatcher *d)
{
bool first = dispatchers.empty();
dispatchers.push_front(d);
if (d->ms_can_fast_dispatch_any())
fast_dispatchers.push_front(d);
if (first)
ready();
}      

在ready函數中調用DispatchQueue::start, start()函數啟動DispatchQueue::DispatchThread和DispatchQueue::LocalDeliveryThread線程類,最終調用DispatchQueue::entry()和DispatchQueue::run_local_delivery。

void DispatchQueue::start()
{
assert(!stop);
assert(!dispatch_thread.is_started());
dispatch_thread.create("ms_dispatch");    //調用Thread::create->Thread::try_create->Thread::_entry_func->Thread::entry_wrapper->DispatchThread::entry
local_delivery_thread.create("ms_local");
}      
class DispatchThread : public Thread
{
DispatchQueue *dq;
public:
explicit DispatchThread(DispatchQueue *dq) : dq(dq) {}
void *entry()
    {
dq->entry();
return 0;
    }
} dispatch_thread;      

在DispatchQueue::entry()中調用根據不同的指令碼調用不同的Messenger類中的處理函數

void DispatchQueue::entry()
{
    .
    .

switch (qitem.get_code())
    {
case D_BAD_REMOTE_RESET:
msgr->ms_deliver_handle_remote_reset(qitem.get_connection());
break;
case D_CONNECT:
msgr->ms_deliver_handle_connect(qitem.get_connection());
break;
case D_ACCEPT:
msgr->ms_deliver_handle_accept(qitem.get_connection());
break;
case D_BAD_RESET:
msgr->ms_deliver_handle_reset(qitem.get_connection());
break;
default:
assert(0);
    }
}
else
{
Message *m = qitem.get_message();
if (stop)
    {
ldout(cct, 10) << " stop flag set, discarding " << m << " " << *m << dendl;
m->put();
    }
else
    {
uint64_t msize = pre_dispatch(m);
msgr->ms_deliver_dispatch(m);
post_dispatch(m, msize);
    }
}
.
.

}      

在Messenger::ms_deliver_dispatch中最終調用不同的Dipatcher繼承類的ms_dispatch進行處理

void ms_deliver_dispatch(Message *m)
{
m->set_dispatch_stamp(ceph_clock_now(cct));
for (list<Dispatcher *>::iterator p = dispatchers.begin();
p != dispatchers.end();
++p)
    {
if ((*p)->ms_dispatch(m))     //在Dispatcher繼承類中進行處理
return;
    }
lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
<< m->get_source_inst() << dendl;
assert(!cct->_conf->ms_die_on_unhandled_msg);
m->put();
}      

建立 osd_dispatcher/mon_dispatcher ==>add_dispatcher_

                                                                 ==>ready()

                                                                       ==>start()

                                                                             ==>DispatchThread

                                                                                   ==>entry()

                                                                             ==>LocalDeliveryThread

                                                                                   ==>run_local_delivery()

ms_fast_dispatch和ms_dispatch的差別

d) 調用函數read_message()來接收消息,當本函數傳回後,就完成了接收消息

2) 調用函數in_q->fast_preprocess(m)預處理消息

3) 調用函數in_q->can_fast_dispatch(m),如果可以進行fast_dispatch,就in_q->fast_dispatch(m)處理。fast_dispatch并不把消息加入到mqueue裡,而是直接調用msgr->ms_fast_dispatch()函數,并最終調用注冊的fast_dispatcher來進行處理。

4) 如果不能fast_dispatch,就調用函數in_q->enqueue(m, m->get_priority(), conn_id)把接收到的消息加入到DispatchQueue的mqueue隊列裡,由DispatchQueue的分發線程調用ms_dispatch處理。

ms_fast_dispatch和ms_dispatch兩種處理的差別在于:ms_dispatch是由DispatchQueue的線程處理的,它是一個單線程;ms_fast_dispatch函數是由Pipe接收線程直接調用處理的,是以性能比前者好。

下一篇: CephFS 使用

繼續閱讀