天天看點

handler ,Looper的機制,分析源碼(二)消息的收發

之前從使用handler入手,順了下handler線程切換的流程。

位址:

http://blog.csdn.net/y1962475006/article/details/52243671

這篇需要詳細分析各部分的關系和工作。

一、handler、Looper、message、messagequeue的關系

先看一個UML圖:

handler ,Looper的機制,分析源碼(二)消息的收發

可以看到它們之間都是依賴:

Handler -> Looper

Handler -> MessageQueue(其實這個依賴是取自Looper的)

Looper -> MessageQueue

Looper -> Thread

MessageQueue -> Message

Message -> Message

Message -> Handler

從下到上,從這個UML圖中可以看出:

  • Message 是一個單連結清單結構;
  • MessageQueue直接管理Message;
  • Looper 持有着線程資訊;
  • Looper 和 Handler 可以操作MessageQueue;

“MessageQueue”是由單連結清單實作的隊列。

從構造函數說起

1、 Looper

private Looper(boolean quitAllowed) {
        mQueue = new MessageQueue(quitAllowed);
        mThread = Thread.currentThread();
    }
           

可以看到,在Looper構造的時候,執行個體了MessageQueue和Thread。同時,構造函數是私有的,是以不能由外部new 一個Looper。隻能通過prepare方法生成。

private static void prepare(boolean quitAllowed) {
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper(quitAllowed));
    }
           

ThreadLocal 是一個線程次元的資料結構,就是說,每個線程都有一份它存儲的變量的副本,互不幹擾。有興趣可以看一下它實作的源碼。

2、MessageQueue

MessageQueue(boolean quitAllowed) {
        mQuitAllowed = quitAllowed;
        mPtr = nativeInit();
    }
           

構造參數就一個quitAllowed,字面意思就是是否允許退出。這裡先不管他。重頭戲是

mPtr = nativeInit();
           

這是一個native方法。mPtr 是一個long型。那就看看這個native方法(frameworks/base/core/jni/android_os_MessageQueue.cpp):

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    if (!nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");
        return 0;
    }

    nativeMessageQueue->incStrong(env);
    return reinterpret_cast<jlong>(nativeMessageQueue);
}
           

有趣的是,在nativeInit方法中,又new了一個NativeMessageQueue,也就是一個本地消息隊列,并且傳回了這個本地消息隊列的位址偏移量。于是,mPtr就被指派為這個偏移量。到此,Java層的MessageQueue就有了NativeMessage的對應關系,後續可以通過mPtr偏移量找到NativeMessageQueue。

再看看這個NativeMessageQueue的構造函數:

NativeMessageQueue::NativeMessageQueue() :
        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
    mLooper = Looper::getForThread();
    if (mLooper == NULL) {
        mLooper = new Looper(false);
        Looper::setForThread(mLooper);
    }
}
           

啊哈?這裡又來了一個Looper?NO NO ,這并不是我們之前的java層的Looper,而是C++ native層的。而這個Looper,也是和線程綁定的。

到此,java層和native層各有一個MessageQueue 和Looper,而且都是和線程一對一的。隻不過java 層、native層兩者的關系相反:java層 Looper依賴MessageQueue ,native層MessageQueue依賴Looper。

handler ,Looper的機制,分析源碼(二)消息的收發

發送過程

上一篇說到過,handler的發送,最後都走到了handler.enqueueMessage:

private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
        msg.target = this;
        if (mAsynchronous) {
            msg.setAsynchronous(true);
        }
        return queue.enqueueMessage(msg, uptimeMillis);
    }
           

然後是queue.enqueueMessage(msg, uptimeMillis)。

來看:

boolean enqueueMessage(Message msg, long when) {
 <!-- 如果target即messgae沒有綁定Handler,會直接抛異常退出;-->
        if (msg.target == null) {
            throw new IllegalArgumentException("Message must have a target.");
        }
        <!-- 如果msg标記為正在使用,退出-->

        if (msg.isInUse()) {
            throw new IllegalStateException(msg + " This message is already in use.");
        }
                synchronized (this) {
                 <!-- 如果線程正在退出,傳回false并回收msg-->
            if (mQuitting) {
                IllegalStateException e = new IllegalStateException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w(TAG, e.getMessage(), e);
                msg.recycle();
                return false;

            }
         <!-- 下面是正常流程-->
          <!--标記msg為使用狀态-->
            msg.markInUse();

            msg.when = when;
            Message p = mMessages;
            boolean needWake;
                <!--  如果隊列是空的,新來的msg作為單連結清單的頭-->
            if (p == null || when == 0 || when < p.when) {
                // New head, wake up the event queue if blocked.
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;

            } else {
             <!--如果隊列不為空,msg消息插入隊列,是按照時間順序插入的,也就是說隊列是按時間由小到大排序-->
                // Inserted within the middle of the queue.  Usually we don't have to wake
                // up the event queue unless there is a barrier at the head of the queue
                // and the message is the earliest asynchronous message in the queue.
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                for (;;) {
                    prev = p;
                    p = p.next;
                    <!--找到第一個時間比入隊時間大的位置-->
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;

            }

            // We can assume mPtr != 0 because mQuitting is false.
            <!--    如果有必要就喚醒隊列-->
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }
           

插入的Message按時間排了序,也就是說,MessageQueue是由單連結清單實作的按時間排序的隊列。這點很重要。

“如果有必要就喚醒隊列”是根據needWake這個字段判斷的。隻有在隊列目前處在阻塞狀态并且設定了屏障(target==null)、msg是異步消息的時候,才會去喚醒。看下native的實作:

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->wake();
}
           

呀呀,NativeMessageQueue又通過java 層傳過來的mptr生成的,并且發現除了nativeInit()外所有的本地方法都是要傳mPtr過去的,這也證明了,java 層通過mptr這個偏移量找C++層的對應隊列。瞧一瞧NativeMessageQueue的wake():

void NativeMessageQueue::wake() {
    mLooper->wake();
}
           

調用了Looper 的wake。

回顧發送的過程,可以發現,enqueueMessage的過程,隻是在Java層把Message執行個體插入了單連結清單,native層做的事隻是底層隊列的喚醒與否。

讀取Message

上一篇說到過,讀取message是通過Looper.loop(),開啟無限循環讀取的,loop()方法裡,是通過queue.next()拿到message的。我們看這個方法:

Message next() {
        // Return here if the message loop has already quit and been disposed.
        // This can happen if the application tries to restart a looper after quit
        // which is not supported.
        final long ptr = mPtr;
        if (ptr == 0) {
            return null;
        }

        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        int nextPollTimeoutMillis = 0;
        <!--開始進入循環-->
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }
            <!--設定休眠,0表示不休眠-->
            nativePollOnce(ptr, nextPollTimeoutMillis);

            synchronized (this) {
                // Try to retrieve the next message.  Return if found.
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;
               <!-- 如果消息隊列不為空,且設定了屏障,周遊找出下一個異步的消息-->
                if (msg != null && msg.target == null) {
                    // Stalled by a barrier.  Find the next asynchronous message in the queue.
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                if (msg != null) {
                <!--如果消息隊列不空且,目前要處理的消息設定的處理時間比現在要大,就是還沒到要處理它,設定等待時間。-->
                    if (now < msg.when) {
                        // Next message is not ready.  Set a timeout to wake up when it is ready.
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                    <!--如果可以處理這個消息了,取出這個msg,并标記為使用中-->
                        // Got a message.
                        mBlocked = false;
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                        msg.markInUse();
                        return msg;
                    }
                } else {
                <!--消息隊列為空,進入閑等待-->
                    // No more messages.
                    nextPollTimeoutMillis = -1;
                }
                <!--後面的省略。。。-->

        } 
           

注釋中解釋了大部分和消息擷取相關的代碼。其中nativePollOnce 是一個native方法,表示等待或者說休眠多久。nextPollTimeoutMillis 表示下次循環的時候,需要等待的時間,0表示不等待,-1表示無限等待。但是無限等待不就是死循環嗎?那不就導緻ANR了?這裡應該了解忙等待和閑等待。舉個例子,現在有的手扶電梯為了節能,在沒有人的時候會是停着的,這就是閑等待,如果有人站上去,就會喚醒運作;如果電梯上沒有位置了,那麼後來的人就要等待,這就是忙等待。

看nativePollOnce的代碼:

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
           

對應的:

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;

    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}
           

發現最後調用了Looper的pollOnce方法。

native 層Looper

到這裡估計就有人問了。為什麼MessageQueuen的native 方法分析了,不管發送和讀取到Looper這就停了?

其實是故意的哈哈。上面分析到,發送消息,在java層,就是msg丢到單連結清單裡,然後native層Looper喚醒線程;讀取消息,在java層,MessageQueue取出需要處理的消息,然後native層Looper設定等待。最後都落在了Looper,由Looper處理線程的喚醒和等待。

那麼就看一下native層Looper(5.0以上在system/core/libutils/Looper.cpp)的構造方法:

Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",
                        strerror(errno));

    AutoMutex _l(mLock);
    rebuildEpollLocked();
}
           

可以看到,這裡用的是linux系統中的epoll機制。epoll是一個Linux下的IO多路複用的機制,可以監聽fd以及fd上的事件例如讀寫打開關閉,當有事件時傳回,沒事件時阻塞等待喚醒或者逾時。更詳細的這裡不作詳解。但是要留意mWakeEventFd,這個就是和喚醒有關的fd。

直接看Looper->pollOnce() 和 Looper->wake()。

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            if (ident >= 0) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
                ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
                        "fd=%d, events=0x%x, data=%p",
                        this, ident, fd, events, data);
#endif
                if (outFd != NULL) *outFd = fd;
                if (outEvents != NULL) *outEvents = events;
                if (outData != NULL) *outData = data;
                return ident;
            }
        }

        if (result != 0) {
#if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = 0;
            if (outData != NULL) *outData = NULL;
            return result;
        }

        result = pollInner(timeoutMillis);
    }
}
           

前面那些可以先不管,直接到最後一句result = pollInner(timeoutMillis); 因為隻有這一句涉及到了時間:

int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif

    // Adjust the timeout based on when the next message is due.
    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
        if (messageTimeoutMillis >= 0
                && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
            timeoutMillis = messageTimeoutMillis;
        }
#if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d",
                this, mNextMessageUptime - now, timeoutMillis);
#endif
    }

    // Poll.
    int result = POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;

    // We are about to idle.
    mPolling = true;

    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    <!--阻塞到這裡,如果有事件就緒或者逾時,會從阻塞狀态推出,eventcount>0,eventItems包含了就緒的事件fd-->
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    // No longer idling.
    mPolling = false;

    // Acquire lock.
    mLock.lock();

    // Rebuild epoll set if needed.
    if (mEpollRebuildRequired) {
        mEpollRebuildRequired = false;
        rebuildEpollLocked();
        goto Done;
    }
    <!--小于0認為出錯-->
    // Check for poll error.
    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }
        ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
        result = POLL_ERROR;
        goto Done;
    }
    <!--等于零認為逾時-->
    // Check for poll timeout.
    if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - timeout", this);
#endif
        result = POLL_TIMEOUT;
        goto Done;
    }

    // Handle all events.
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
<!--大于0的時候,周遊就緒的fd,如果有mWakeEventFd,且事件是EPOLLIN(可讀取或者socket關閉),去讀它。-->
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd) {
            if (epollEvents & EPOLLIN) {
                awoken();
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
            }
        } else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
Done: ;

    // Invoke pending message callbacks.
    mNextMessageUptime = LLONG_MAX;
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            // Remove the envelope from the list.
            // We keep a strong reference to the handler until the call to handleMessage
            // finishes.  Then we drop it so that the handler can be deleted *before*
            // we reacquire our lock.
            { // obtain handler
                sp<MessageHandler> handler = messageEnvelope.handler;
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock();

#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
                ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
                        this, handler.get(), message.what);
#endif
                handler->handleMessage(message);
            } // release handler

            mLock.lock();
            mSendingMessage = false;
            result = POLL_CALLBACK;
        } else {
            // The last message left at the head of the queue determines the next wakeup time.
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }

    // Release lock.
    mLock.unlock();

    // Invoke all response callbacks.
    for (size_t i = 0; i < mResponses.size(); i++) {
        Response& response = mResponses.editItemAt(i);
        if (response.request.ident == POLL_CALLBACK) {
            int fd = response.request.fd;
            int events = response.events;
            void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
            ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
                    this, response.request.callback.get(), fd, events, data);
#endif
            // Invoke the callback.  Note that the file descriptor may be closed by
            // the callback (and potentially even reused) before the function returns so
            // we need to be a little careful when removing the file descriptor afterwards.
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq);
            }

            // Clear the callback reference in the response structure promptly because we
            // will not clear the response vector itself until the next poll.
            response.request.callback.clear();
            result = POLL_CALLBACK;
        }
    }
    return result;
}
           

awoken方法:

void Looper::awoken() {
        #if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ awoken", this);
        #endif

            uint64_t counter;
            TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t)));
        }
           

最後對mWakeEventFd做了讀取。

是以到這裡,阻塞從jave的Looper.loop()->MessageQueue.next()->nativePollOnce()->native Looper.pollOnce()->Looper.pollInner()->epoll_wait();最終停在了epoll這裡。

再看Looper.wake():

void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ wake", this);
#endif

    uint64_t inc = 1;
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    if (nWrite != sizeof(uint64_t)) {
        if (errno != EAGAIN) {
            LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s",
                    mWakeEventFd, strerror(errno));
        }
    }
}
           

這個代碼很簡單,隻要往mWakeEventFd寫入。就緒時,核心會通知給epoll_wait,程序再次調用時,epoll_wait會傳回這個fd。再回顧一下取消息的過程(僞代碼):

Looper.loop(){
    while(true){
        Message msg = Message.Next(){
            while(true){
                設定逾時阻塞;
                取出單連結清單頭消息;
                if(消息是就緒的){
                    立即傳回該消息;
                }else{
                    更新下次阻塞時間;
                    continue;
                }
            }
        }
        分發msg
    }
}
           

發現花括号裡兩個無限循環,由于message單連結清單是按時間由小到大排序的,就緒的消息一出隊列就立馬傳回分發了,剩下未就緒的通過epoll_wait 等待設定時間逾時(傳回0)或者有新的可就緒消息入隊列(mWakeEventFd進行寫操作),這個過程CPU去做其他事,是個閑等待,這裡說的阻塞是指阻塞了操作MessageQueue的線程。當時間逾時或者有新的消息入隊列的時候,epoll_wait 會退出阻塞,線程變為活躍态,循環繼續。

這個過程中你也許看到了native層的Message會Handler。沒錯,native層也是有的,而且作用和java層的一樣,隻不過是用來處理其他硬體裝置的fd的,我們不用考慮。

總結

到此為止,分析了java 層和native層的消息發送和讀取的源碼過程:

java 層負責對Message這個單連結清單進行插入和删除操作,native負責線程的喚醒和休眠。

最後上一個圖:

handler ,Looper的機制,分析源碼(二)消息的收發

綠色是發送流程,黃色是讀取流程。