http://blog.csdn.net/lewif/article/details/50586030
目錄(?)[+]
- MessageQueue等待消息
- epoll監聽fd
- pipe fd導緻epoll_wait傳回
- BitTube fd導緻epoll_wait傳回
MessageQueue(簡稱為MQ)是surfaceflinger(簡稱為SF)主線程中消息處理的“管家”,所有子線程要和主線程打交道都需要通過MQ,例如發送消息,發送Vsync信号等,這裡主要分析MQ具體的實作流程。
下面這幅圖是MQ處理消息的一個大概流程,下面根據其中的内容展開(黃色部分表示類名,綠色為類成員),MQ主要處理兩類事件,一種是Message,一種是Event(Vsync),如圖中的①②所示,圖中epoll_wait()下的P表示pipe描述符,B表示BitTube描述符。
MessageQueue等待消息
在前面分析過,SF程序其實核心就是個接收消息,然後處理的過程。在SF啟動過程中,最後會去執行run()函數,是個while循環,一直在等待事件的到來waitForEvent(),
void SurfaceFlinger::run() {
do {
waitForEvent();
} while (true);
}
進而去調用MQ的waitMessage(),“大管家”直接出場,其核心也是個while循環,處理函數為mLooper->pollOnce(-1),
voidMessageQueue::waitMessage(){
do {
IPCThreadState::self()->flushCommands();
int32_t ret= mLooper->pollOnce(-1);
switch (ret) {
caseALOOPER_POLL_WAKE:
caseALOOPER_POLL_CALLBACK:
continue;
caseALOOPER_POLL_ERROR:
ALOGE("ALOOPER_POLL_ERROR");
caseALOOPER_POLL_TIMEOUT:
// timeout (shouldnot happen)
continue;
default:
// should not happen
ALOGE("Looper::pollOnce()returned unknown status %d", ret);
continue;
}
} while (true);
}
而pollOnce(),會去調用pollInner(),
intLooper::pollInner(int timeoutMillis){
// Poll.
//poll之前先把mResponses清空
int result =ALOOPER_POLL_WAKE;
mResponses.clear();
mResponseIndex= 0;
// We are about to idle.
mIdling = true;
//通過epoll_wait等到事件的到來,監聽了哪些描述符,需要去找epoll_ctl()
struct epoll_eventeventItems[EPOLL_MAX_EVENTS];
int eventCount =epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
//事件到來
for (int i = 0; i <eventCount; i++) {
int fd =eventItems[i].data.fd;
uint32_tepollEvents = eventItems[i].events;
//分兩類事件
//第一種為消息
if (fd ==mWakeReadPipeFd) {
if (epollEvents& EPOLLIN) {
awoken();
} else {
ALOGW("Ignoringunexpected epoll events 0x%x on wake read pipe.", epollEvents);
}
} else {
//第二種為Vsync信号
ssize_trequestIndex = mRequests.indexOfKey(fd);
if (requestIndex>= 0) {
int events = 0;
if (epollEvents& EPOLLIN) events |= ALOOPER_EVENT_INPUT;
if (epollEvents& EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents& EPOLLERR) events |= ALOOPER_EVENT_ERROR;
if (epollEvents& EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoringunexpected epoll events 0x%x on fd %d that is "
"no longerregistered.", epollEvents, fd);
}
}
}
Done: ;
//處理消息
// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while(mMessageEnvelopes.size() != 0) {
nsecs_t now= systemTime(SYSTEM_TIME_MONOTONIC);
constMessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to thehandler until the call to handleMessage
// finishes. Then we drop it so that the handler can bedeleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
handler->handleMessage(message);
} // release handler
mLock.lock();
mSendingMessage = false;
result= ALOOPER_POLL_CALLBACK;
} else {
// The last message left at the head ofthe queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
//處理Vsync信号
// Invoke all response callbacks.
for (size_t i = 0; i <mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if(response.request.ident == ALOOPER_POLL_CALLBACK) {
int fd = response.request.fd;
int events =response.events;
void* data =response.request.data;
#ifDEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd eventcallback %p: fd=%d, events=0x%x, data=%p",
this,response.request.callback.get(), fd, events, data);
#endif
int callbackResult= response.request.callback->handleEvent(fd, events, data);
if (callbackResult== 0) {
removeFd(fd);
}
// Clear the callback reference in theresponse structure promptly because we
// will not clear the response vectoritself until the next poll.
response.request.callback.clear();
result= ALOOPER_POLL_CALLBACK;
}
}
return result;
}
從上面的代碼可以看出,epoll_wait()等待了兩類信号的到來,一種是message,一種是Vsync event,那麼肯定是epoll監聽了兩個描述符,那麼都是在哪裡添加的呢?
epoll監聽fd
首先在Looper的構造函數中,建立了圖中所示的兩個描述符mWakeReadPipeFd ,mWakeWritePipeFd 分别對應pipe的讀和寫,并且将讀描述符mWakeReadPipeFd通過epoll_ctl添加到監聽的描述符中。這個描述符所對應的是Message消息。
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
int wakeFds[2];
//建立一個管道,一個寫東西另外一個就有東西讀了,
int result = pipe(wakeFds);
LOG_ALWAYS_FATAL_IF(result!= 0, "Could not create wake pipe. errno=%d", errno);
mWakeReadPipeFd= wakeFds[0];
mWakeWritePipeFd = wakeFds[1];
result = fcntl(mWakeReadPipeFd,F_SETFL, O_NONBLOCK);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipenon-blocking. errno=%d",
errno);
result = fcntl(mWakeWritePipeFd,F_SETFL, O_NONBLOCK);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipenon-blocking. errno=%d",
errno);
mIdling =false;
// Allocate the epoll instance and register thewake pipe.
mEpollFd =epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
structepoll_event eventItem;
memset(&eventItem, 0,sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeReadPipeFd;
//監聽
result =epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epollinstance. errno=%d",
errno);
}
其次在Looper的setEventThread()函數中,epoll注冊了另外一個描述符,這個描述符是對應BitTube mEventTube中的讀描述符,而對應的寫描述符在EventThread的mDisplayEventConnections中。
void MessageQueue::setEventThread(const sp<EventThread>& eventThread)
{
mEventThread = eventThread;
//首先建立一個完整的Connection,裡面的BitTube中讀寫描述符都在
mEvents = eventThread->createEventConnection();
//重建一個Connection,裡面包含了mEvents的讀描述符
mEventTube = mEvents->getDataChannel();
//把讀描述符注冊到epoll監聽
mLooper->addFd(mEventTube->getFd(), 0,ALOOPER_EVENT_INPUT,
MessageQueue::cb_eventReceiver, this);
}
int Looper::addFd(int fd, int ident, int events, constsp<LooperCallback>& callback, void* data) {
int epollEvents = 0;
if (events &ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
if (events &ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
{ // acquire lock
AutoMutex_l(mLock);
//首先把要監聽的fd,回調函數等建構一個Request
Requestrequest;
request.fd= fd;
request.ident = ident;
request.callback = callback;
request.data = data;
struct epoll_eventeventItem;
memset(&eventItem, 0, sizeof(epoll_event)); // zero out unused members of data fieldunion
eventItem.events= epollEvents;
eventItem.data.fd = fd;
ssize_trequestIndex = mRequests.indexOfKey(fd);
if (requestIndex< 0) {
//監聽描述符
int epollResult =epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
if (epollResult< 0) {
ALOGE("Erroradding epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
//将request放到鍵值對mRequests中
mRequests.add(fd, request);
} else {
int epollResult =epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
if (epollResult< 0) {
ALOGE("Errormodifying epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
return1;
}
pipe fd導緻epoll_wait傳回
上面注冊了pipe和BitTube的讀描述符,那麼當這兩個對應的寫描述符有寫操作時,epoll_wait()就會傳回,然後進行消息的處理。首先分析pipe對應的寫描述符的激活,一般給SF發消息(隻分析異步消息),都是去調用SF的postMessageAsync()函數,這個函數的入參是MessageBase類,是以給SF發消息首先會對消息進行封裝,封裝為MessageBase的子類,前面介紹過。
status_t SurfaceFlinger::postMessageAsync(const sp<MessageBase>&msg,
nsecs_treltime, uint32_t flags) {
returnmEventQueue.postMessage(msg, reltime);
}
最終回去調用Looper的sendMessageAtTime,
voidLooper::sendMessageAtTime(nsecs_t uptime, constsp<MessageHandler>& handler,
const Message&message) {
#ifDEBUG_CALLBACKS
ALOGD("%p ~ sendMessageAtTime -uptime=%lld, handler=%p, what=%d",
this, uptime,handler.get(),message.what);
#endif
size_t i = 0;
{ // acquire lock
AutoMutex_l(mLock);
//有個儲存所有消息的Vector,
//如果mMessageEnvelopes這裡面沒有消息,則i=0
//mMessageEnvelopes中的消息按觸發時間的先後順序排列
size_tmessageCount = mMessageEnvelopes.size();
while (i <messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}
//處理的消息都被封裝為MessageEnvelope,
MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
// Optimization: If the Looper iscurrently sending a message, then we can skip
// the call to wake() because the nextthing the Looper will do after processing
// messages is to decide when the nextwakeup time should be. In fact, it does
// not even matter whether this code isrunning on the Looper thread.
if(mSendingMessage) {
return;
}
} // release lock
// Wake the poll loop only when weenqueue a new message at the head.
// 如果i=0,則去調用wake(),
// 隻有我們把這個消息插入到MessageEnvelope頭部時,才會去激活epoll_wait()傳回處理消息
if (i == 0) {
wake();
}
}
在weak()函數中我們看到有寫mWakeWritePipeFd的操作,進而會導緻epoll_wait傳回。從上面的代碼能夠看到并不是每來一個消息都會去馬上處理,mMessageEnvelopes中的消息按觸發時間的先後順序排列,如果我們插入到頭部,表明新加入的這個消息是需要最近處理的,mMessageEnvelopes中後面還有其他待處理的消息,這時候才會去激活epoll_wait()傳回,這種處理方式主要是出于效率的考慮,一次處理的消息不能太多也不能太少。
void Looper::wake(){
#ifDEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
ssize_t nWrite;
do {
nWrite =write(mWakeWritePipeFd, "W", 1);
} while (nWrite == -1 &&errno == EINTR);
if (nWrite != 1) {
if (errno !=EAGAIN) {
ALOGW("Could not write wake signal,errno=%d", errno);
}
}
}
BitTube fd導緻epoll_wait傳回
當Vsync信号的到來時,EventThread從睡眠中打斷,調用Connection的postEvent函數,這個Connection裡面儲存了BitTube的寫描述符。
boolEventThread::threadLoop() {
DisplayEventReceiver::Event event;
Vector<sp<EventThread::Connection> > signalConnections;
signalConnections = waitForEvent(&event);
//vsync信号到來,睡醒了
// dispatch events to listeners...
const size_t count =signalConnections.size();
for (size_t i=0 ; i<count ;i++) {
constsp<Connection>& conn(signalConnections[i]);
// now see if we still need to reportthis event
//調用Connection的postEvent函數,這個Connection裡面儲存了BitTube的寫描述符
status_terr = conn->postEvent(event);
}
returntrue;
}
進而會往BitTube的寫描述符中寫東西,對應對端的讀描述符被激活,epoll_wait()激活。
status_t EventThread::Connection::postEvent(
constDisplayEventReceiver::Event& event) {
ssize_t size =DisplayEventReceiver::sendEvents(mChannel, &event, 1);
returnsize < 0 ? status_t(size) :status_t(NO_ERROR);
}