天天看點

android graphic(6)—surfaceflinger和MessageQueue

http://blog.csdn.net/lewif/article/details/50586030

目錄(?)[+]

  • MessageQueue等待消息
  • epoll監聽fd
  • pipe fd導緻epoll_wait傳回
  • BitTube fd導緻epoll_wait傳回

MessageQueue(簡稱為MQ)是surfaceflinger(簡稱為SF)主線程中消息處理的“管家”,所有子線程要和主線程打交道都需要通過MQ,例如發送消息,發送Vsync信号等,這裡主要分析MQ具體的實作流程。 

下面這幅圖是MQ處理消息的一個大概流程,下面根據其中的内容展開(黃色部分表示類名,綠色為類成員),MQ主要處理兩類事件,一種是Message,一種是Event(Vsync),如圖中的①②所示,圖中epoll_wait()下的P表示pipe描述符,B表示BitTube描述符。

MessageQueue等待消息

在前面分析過,SF程序其實核心就是個接收消息,然後處理的過程。在SF啟動過程中,最後會去執行run()函數,是個while循環,一直在等待事件的到來waitForEvent(),

void SurfaceFlinger::run() {

    do {

        waitForEvent();

    } while (true);

}

進而去調用MQ的waitMessage(),“大管家”直接出場,其核心也是個while循環,處理函數為mLooper->pollOnce(-1),

voidMessageQueue::waitMessage(){

    do {

        IPCThreadState::self()->flushCommands();

        int32_t ret= mLooper->pollOnce(-1);

        switch (ret) {

            caseALOOPER_POLL_WAKE:

            caseALOOPER_POLL_CALLBACK:

                continue;

            caseALOOPER_POLL_ERROR:

               ALOGE("ALOOPER_POLL_ERROR");

            caseALOOPER_POLL_TIMEOUT:

                // timeout (shouldnot happen)

                continue;

            default:

                // should not happen

               ALOGE("Looper::pollOnce()returned unknown status %d", ret);

                continue;

        }

    } while (true);

}

而pollOnce(),會去調用pollInner(),

intLooper::pollInner(int timeoutMillis){

    // Poll.

    //poll之前先把mResponses清空

    int result =ALOOPER_POLL_WAKE;

   mResponses.clear();

    mResponseIndex= 0;

    // We are about to idle.

    mIdling = true;

    //通過epoll_wait等到事件的到來,監聽了哪些描述符,需要去找epoll_ctl()

    struct epoll_eventeventItems[EPOLL_MAX_EVENTS];

    int eventCount =epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    //事件到來

    for (int i = 0; i <eventCount; i++) {

        int fd =eventItems[i].data.fd;

        uint32_tepollEvents = eventItems[i].events;

        //分兩類事件

        //第一種為消息

        if (fd ==mWakeReadPipeFd) {

            if (epollEvents& EPOLLIN) {

               awoken();

            } else {

               ALOGW("Ignoringunexpected epoll events 0x%x on wake read pipe.", epollEvents);

            }

        } else {

            //第二種為Vsync信号

            ssize_trequestIndex = mRequests.indexOfKey(fd);

            if (requestIndex>= 0) {

                int events = 0;

                if (epollEvents& EPOLLIN) events |= ALOOPER_EVENT_INPUT;

                if (epollEvents& EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;

                if (epollEvents& EPOLLERR) events |= ALOOPER_EVENT_ERROR;

                if (epollEvents& EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;

               pushResponse(events, mRequests.valueAt(requestIndex));

            } else {

               ALOGW("Ignoringunexpected epoll events 0x%x on fd %d that is "

                       "no longerregistered.", epollEvents, fd);

            }

        }

    }

Done: ;

     //處理消息

    // Invoke pending message callbacks.

   mNextMessageUptime = LLONG_MAX;

    while(mMessageEnvelopes.size() != 0) {

        nsecs_t now= systemTime(SYSTEM_TIME_MONOTONIC);

        constMessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);

        if (messageEnvelope.uptime <= now) {

            // Remove the envelope from the list.

            // We keep a strong reference to thehandler until the call to handleMessage

            // finishes.  Then we drop it so that the handler can bedeleted *before*

            // we reacquire our lock.

            { // obtain handler

               sp<MessageHandler> handler = messageEnvelope.handler;

               Message message = messageEnvelope.message;

               mMessageEnvelopes.removeAt(0);

               mSendingMessage = true;

               mLock.unlock();

               handler->handleMessage(message);

            } // release handler

            mLock.lock();

           mSendingMessage = false;

            result= ALOOPER_POLL_CALLBACK;

        } else {

            // The last message left at the head ofthe queue determines the next wakeup time.

           mNextMessageUptime = messageEnvelope.uptime;

            break;

        }

    }

    // Release lock.

    mLock.unlock();

    //處理Vsync信号

    // Invoke all response callbacks.

    for (size_t i = 0; i <mResponses.size(); i++) {

       Response& response = mResponses.editItemAt(i);

        if(response.request.ident == ALOOPER_POLL_CALLBACK) {

            int fd = response.request.fd;

            int events =response.events;

            void* data =response.request.data;

#ifDEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS

            ALOGD("%p ~ pollOnce - invoking fd eventcallback %p: fd=%d, events=0x%x, data=%p",

                   this,response.request.callback.get(), fd, events, data);

#endif

            int callbackResult= response.request.callback->handleEvent(fd, events, data);

            if (callbackResult== 0) {

               removeFd(fd);

            }

            // Clear the callback reference in theresponse structure promptly because we

            // will not clear the response vectoritself until the next poll.

           response.request.callback.clear();

            result= ALOOPER_POLL_CALLBACK;

        }

    }

    return result;

}

從上面的代碼可以看出,epoll_wait()等待了兩類信号的到來,一種是message,一種是Vsync event,那麼肯定是epoll監聽了兩個描述符,那麼都是在哪裡添加的呢?

epoll監聽fd

首先在Looper的構造函數中,建立了圖中所示的兩個描述符mWakeReadPipeFd ,mWakeWritePipeFd 分别對應pipe的讀和寫,并且将讀描述符mWakeReadPipeFd通過epoll_ctl添加到監聽的描述符中。這個描述符所對應的是Message消息。

Looper::Looper(bool allowNonCallbacks) :

       mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),

       mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {

    int wakeFds[2];

    //建立一個管道,一個寫東西另外一個就有東西讀了,

    int result = pipe(wakeFds);

    LOG_ALWAYS_FATAL_IF(result!= 0, "Could not create wake pipe.  errno=%d", errno);

    mWakeReadPipeFd= wakeFds[0];

   mWakeWritePipeFd = wakeFds[1];

    result = fcntl(mWakeReadPipeFd,F_SETFL, O_NONBLOCK);

   LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipenon-blocking.  errno=%d",

            errno);

    result = fcntl(mWakeWritePipeFd,F_SETFL, O_NONBLOCK);

   LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipenon-blocking.  errno=%d",

            errno);

    mIdling =false;

    // Allocate the epoll instance and register thewake pipe.

    mEpollFd =epoll_create(EPOLL_SIZE_HINT);

   LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance.  errno=%d", errno);

    structepoll_event eventItem;

    memset(&eventItem, 0,sizeof(epoll_event)); // zero out unused members of data field union

   eventItem.events = EPOLLIN;

   eventItem.data.fd = mWakeReadPipeFd;

    //監聽

    result =epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);

   LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epollinstance.  errno=%d",

            errno);

}

其次在Looper的setEventThread()函數中,epoll注冊了另外一個描述符,這個描述符是對應BitTube mEventTube中的讀描述符,而對應的寫描述符在EventThread的mDisplayEventConnections中。

void MessageQueue::setEventThread(const sp<EventThread>& eventThread)

{

    mEventThread = eventThread;

    //首先建立一個完整的Connection,裡面的BitTube中讀寫描述符都在

    mEvents = eventThread->createEventConnection();

    //重建一個Connection,裡面包含了mEvents的讀描述符

    mEventTube = mEvents->getDataChannel();

    //把讀描述符注冊到epoll監聽

    mLooper->addFd(mEventTube->getFd(), 0,ALOOPER_EVENT_INPUT,

           MessageQueue::cb_eventReceiver, this);

}

int Looper::addFd(int fd, int ident, int events, constsp<LooperCallback>& callback, void* data) {

    int epollEvents = 0;

    if (events &ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;

    if (events &ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;

    { // acquire lock

        AutoMutex_l(mLock);

    //首先把要監聽的fd,回調函數等建構一個Request

        Requestrequest;

        request.fd= fd;

       request.ident = ident;

       request.callback = callback;

       request.data = data;

        struct epoll_eventeventItem;

        memset(&eventItem, 0, sizeof(epoll_event)); // zero out unused members of data fieldunion

        eventItem.events= epollEvents;

       eventItem.data.fd = fd;

        ssize_trequestIndex = mRequests.indexOfKey(fd);

        if (requestIndex< 0) {

             //監聽描述符

            int epollResult =epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);

            if (epollResult< 0) {

               ALOGE("Erroradding epoll events for fd %d, errno=%d", fd, errno);

                return -1;

            }

            //将request放到鍵值對mRequests中

           mRequests.add(fd, request);

        } else {

            int epollResult =epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);

            if (epollResult< 0) {

               ALOGE("Errormodifying epoll events for fd %d, errno=%d", fd, errno);

                return -1;

            }

           mRequests.replaceValueAt(requestIndex, request);

        }

    } // release lock

    return1;

}

pipe fd導緻epoll_wait傳回

上面注冊了pipe和BitTube的讀描述符,那麼當這兩個對應的寫描述符有寫操作時,epoll_wait()就會傳回,然後進行消息的處理。首先分析pipe對應的寫描述符的激活,一般給SF發消息(隻分析異步消息),都是去調用SF的postMessageAsync()函數,這個函數的入參是MessageBase類,是以給SF發消息首先會對消息進行封裝,封裝為MessageBase的子類,前面介紹過。

status_t SurfaceFlinger::postMessageAsync(const sp<MessageBase>&msg,

        nsecs_treltime, uint32_t flags) {

    returnmEventQueue.postMessage(msg, reltime);

}

最終回去調用Looper的sendMessageAtTime,

voidLooper::sendMessageAtTime(nsecs_t uptime, constsp<MessageHandler>& handler,

        const Message&message) {

#ifDEBUG_CALLBACKS

    ALOGD("%p ~ sendMessageAtTime -uptime=%lld, handler=%p, what=%d",

            this, uptime,handler.get(),message.what);

#endif

    size_t i = 0;

    { // acquire lock

        AutoMutex_l(mLock);

        //有個儲存所有消息的Vector,

        //如果mMessageEnvelopes這裡面沒有消息,則i=0

        //mMessageEnvelopes中的消息按觸發時間的先後順序排列

        size_tmessageCount = mMessageEnvelopes.size();

        while (i <messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {

            i += 1;

        }

    //處理的消息都被封裝為MessageEnvelope,

       MessageEnvelope messageEnvelope(uptime, handler, message);

       mMessageEnvelopes.insertAt(messageEnvelope, i, 1);

        // Optimization: If the Looper iscurrently sending a message, then we can skip

        // the call to wake() because the nextthing the Looper will do after processing

        // messages is to decide when the nextwakeup time should be.  In fact, it does

        // not even matter whether this code isrunning on the Looper thread.

        if(mSendingMessage) {

            return;

        }

    } // release lock

    // Wake the poll loop only when weenqueue a new message at the head.

    // 如果i=0,則去調用wake(),

    // 隻有我們把這個消息插入到MessageEnvelope頭部時,才會去激活epoll_wait()傳回處理消息

    if (i == 0) {

        wake();

    }

}

在weak()函數中我們看到有寫mWakeWritePipeFd的操作,進而會導緻epoll_wait傳回。從上面的代碼能夠看到并不是每來一個消息都會去馬上處理,mMessageEnvelopes中的消息按觸發時間的先後順序排列,如果我們插入到頭部,表明新加入的這個消息是需要最近處理的,mMessageEnvelopes中後面還有其他待處理的消息,這時候才會去激活epoll_wait()傳回,這種處理方式主要是出于效率的考慮,一次處理的消息不能太多也不能太少。

void Looper::wake(){

#ifDEBUG_POLL_AND_WAKE

    ALOGD("%p ~ wake", this);

#endif

    ssize_t nWrite;

    do {

        nWrite =write(mWakeWritePipeFd, "W", 1);

    } while (nWrite == -1 &&errno == EINTR);

    if (nWrite != 1) {

        if (errno !=EAGAIN) {

            ALOGW("Could not write wake signal,errno=%d", errno);

        }

    }

}

BitTube fd導緻epoll_wait傳回

當Vsync信号的到來時,EventThread從睡眠中打斷,調用Connection的postEvent函數,這個Connection裡面儲存了BitTube的寫描述符。

boolEventThread::threadLoop() {

   DisplayEventReceiver::Event event;

    Vector<sp<EventThread::Connection> > signalConnections;

   signalConnections = waitForEvent(&event);

    //vsync信号到來,睡醒了

    // dispatch events to listeners...

    const size_t count =signalConnections.size();

    for (size_t i=0 ; i<count ;i++) {

        constsp<Connection>& conn(signalConnections[i]);

        // now see if we still need to reportthis event

        //調用Connection的postEvent函數,這個Connection裡面儲存了BitTube的寫描述符

        status_terr = conn->postEvent(event);

    }

    returntrue;

}

進而會往BitTube的寫描述符中寫東西,對應對端的讀描述符被激活,epoll_wait()激活。

status_t EventThread::Connection::postEvent(

        constDisplayEventReceiver::Event& event) {

    ssize_t size =DisplayEventReceiver::sendEvents(mChannel, &event, 1);

    returnsize < 0 ? status_t(size) :status_t(NO_ERROR);

}