天天看點

Handler消息機制詳解,另對于MessageQueue阻塞線程的詳解

概述

  android中非主線程是不能進行UI操作的,而且在主線程中也不能進行耗時操作。那麼當需要進行耗時操作後再更新UI界面又該怎麼辦呢?

  這裡就涉及到了線程間的消息傳遞機制,也就是Handler機制。通過Handler實作線程間的消息傳遞,并且我們常用的方法Activity.runOnUiThread(Runnable)方式進行的UI界面的更新實際上也是使用的Handler方式。

  該過程涉及四個對象,Handler,Message,Looper,MessageQueue。那麼這四者又是如何配合來達到發送消息的呢?用送信的方式來形容如下:

  • Handler:收信人/發件人,執行具體的操作/發送消息
  • Message:信封,線程發送的消息實體
  • MessageQueue:郵筒,消息隊列,存放消息實體
  • Looper:送信人,從消息隊列中取消息并進行分發
也就是寫信人将信件放到郵筒中,送信人從郵筒中拿出送給收信人。

舉例

  先來看一下不采用這套機制而是直接在子線程中操作UI的結果。

btn = findViewById(R.id.btn_act_two);
btn.setOnClickListener(v -> {
      new Thread(() -> {
           btn.setText("test");
      }).start();
});
           

  上面是直接在btn按鈕的點選事件中開啟一個線程,然後線上程中進行UI更新,不出意料抛出異常資訊。

android.view.ViewRootImpl$CalledFromWrongThreadException: Only the original thread that created a view hierarchy can touch its views.

采用Handler機制:

btn = findViewById(R.id.btn_act_two);
    Handler handler = new Handler();
    btn.setOnClickListener(v -> {
        new Thread(() -> {
            handler.post(() -> {
                btn.setText("test");
            });
        }).start();
    });
           

  此時再運作就可以成功更新UI界面了。

分析

  上面就是一個使用Handler傳遞消息的小示例,畢竟下面分析的所有的過程都是為了消息傳遞的,而根據案例來分析則更容易找到切入點。

  那麼就從Handler開始分析。先看一下該過程的使用,首先建立了一個Handler對象,然後在btn的點選事件中調用他的post方法,參數是一個Runnable對象,在其中封裝了要更新UI的操作。

  但是要看Handler的話又要對Looper有一定的了解,那麼不如先從Looper開始分析吧。

Looper:消息傳輸中的動力源泉

private Looper(boolean quitAllowed) {
        mQueue = new MessageQueue(quitAllowed);
        mThread = Thread.currentThread();
    }
           

  Looper将構造方法私有化,并且沒有提供其他的構造方法。也就是說,Looper不能通過new的方法建立對象,那麼它必定有其他的方法可以獲得Looper實體。繼續檢視源碼可以發現,Looper有兩個方法可以得到Looper對象,prepareMainLooper和prepare方法。

//這個方法建立了一個Looper,并且将它作為主線程的Looper
	public static void prepareMainLooper() {
        prepare(false);
        synchronized (Looper.class) {
            if (sMainLooper != null) {
                throw new IllegalStateException("The main Looper has already been prepared.");
            }
            sMainLooper = myLooper();//擷取目前程序對應的Looper
        }
    }
	//擷取目前程序的Looper
	public static @Nullable Looper myLooper() {
        return sThreadLocal.get();
    }
           

  上面的方法是在建立應用時的主線程中調用的,實際還是通過prepare方法建立的Looper。另外由于該方法将建立的Looper作為了主線程的Looper,是以開發者一般情況下是禁止使用這個方法的。那麼建立Looper的prepare又做了些什麼呢?

private static void prepare(boolean quitAllowed) {
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper(quitAllowed));
    }
           

  可以看到,prepare首先會判斷目前線程是否已經建立過Looper,若是建立過則會抛出異常資訊。也就是說,一個線程隻能對應一個Looper,也就是隻能prepare一次。若是沒有建立過,則會建立一個Looper放到ThreadLocal中。而ThreadLocal是按照線程資訊儲存資料的,擷取時也是根據目前線程擷取對應的資料。ThreadLocal是Looper的靜态常量用于存儲程序對應的Looper。

  當調用了prepare建立Looper之後,在ThreadLocal中就會有一個目前線程的Looper對象了。而在Looper的構造方法中,還建立了一個消息隊列MessageQueue,也就是說,每個Looper都對應的有一個消息隊列(每個送信人負責一個郵筒)。

  現在已經有了郵筒了,那麼送信人什麼時候知道有信封被放在了郵筒中呢?當然是時刻守着郵筒啊,也就是Looper将會一直關注着MessageQueue,這在代碼中是通過loop方法的一個死循環實作的。

//删除了部分代碼
public static void loop() {
	//得到目前線程的Looper,是為了擷取對應的MessageQueue
    final Looper me = myLooper();
    if (me == null) {
        throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
    }
    final MessageQueue queue = me.mQueue;
    //這裡通過死循環一直從消息隊列中取消息
    for (;;) {
    	//取消息,隊列中沒有消息時,會阻塞在這裡
        Message msg = queue.next(); // might block
        if (msg == null) {
            // No message indicates that the message queue is quitting.
            return;
        }
        try {
        	//分發消息
            msg.target.dispatchMessage(msg);
            dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
        } finally {
            if (traceTag != 0) {
                Trace.traceEnd(traceTag);
            }
        }
        msg.recycleUnchecked();
    }
}
           

  在loop方法中,for(;;)循環沒有設定結束條件,也就是一個死循環,那麼為什麼不會造成卡頓呢,這就要看MessageQueue的實作了。在MessageQueue的next方法中,若是隊列中沒有Message時,則會阻塞在這裡。是以loop中的這個死循環不會造成系統的卡頓。

  到這裡,基本上Looper的主要操作就看完了。那麼總結一下,Looper是不能通過new來建立的,而是通過prepare()建立,之後調用loop()來開啟循環,阻塞式地從消息隊列中取消息。

  送信人從郵筒取信件的過程已經分析完了,那麼接下來的問題就是信件什麼時候被放在郵筒中的了。

  還記得文章開頭嗎,在子線程中通過Handler更新UI操作,那麼就從Handler來分析一下。按照慣例看一下Handler的構造方法

Handler:消息發送和進行中心
//其中一個構造方法
	public Handler(Callback callback, boolean async) {
        if (FIND_POTENTIAL_LEAKS) {
            final Class<? extends Handler> klass = getClass();
            if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
                    (klass.getModifiers() & Modifier.STATIC) == 0) {
                Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
                    klass.getCanonicalName());
            }
        }
		//擷取目前程序對應的Looper
        mLooper = Looper.myLooper();
        if (mLooper == null) {
            throw new RuntimeException(
                "Can't create handler inside thread " + Thread.currentThread()
                        + " that has not called Looper.prepare()");
        }
        mQueue = mLooper.mQueue;
        mCallback = callback;
        mAsynchronous = async;
    }
           

  從構造方法可以看出,在Handler中的一些屬性mHandler,mQueue等都是從Looper中的ThreadLocal中擷取的,而關于構造方法的詳細解釋,會在後文中說明,這裡隻是為了走一遍發送消息的過程。

  建立Handler後,接下來就是post方法,畢竟post中才是更新UI的操作,那麼post方法有做了什麼呢?

public final boolean post(Runnable r){
   return  sendMessageDelayed(getPostMessage(r), 0);
}
           

  這裡直接調用了sendMessageDelayed方法,同時将post的參數runnable進行了封裝。

private static Message getPostMessage(Runnable r) {
    Message m = Message.obtain();
    m.callback = r;
    return m;
}
           

  可以看到,getPostMessage将runnable操作封裝到了Message中,也就是将信紙放在信封中,畢竟信封才是信件的載體。而在Handler中,并不隻是post這一種方式發送消息,而是有一系列的方法可以選擇,下圖總結了Handler中的所有發送消息的方法,并分為了兩類,其中紅色是将消息放在消息隊列的前面(将會優先出隊發送出去),黃色是按照正常放在隊列後面。

Handler消息機制詳解,另對于MessageQueue阻塞線程的詳解

  由圖中各種方法的調用可以看出,不論是何種方法,都是将消息(runnable或者空消息int )封裝在Message中,然後再調用enqueueMessage方法将消息放入消息隊列中。這裡将沒有具體操作(callback==null)的Message稱為空消息。

private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
    msg.target = this;
    if (mAsynchronous) {
        msg.setAsynchronous(true);
    }
    return queue.enqueueMessage(msg, uptimeMillis);
}
           

  繼續追蹤,在enqueueMessage中就做了兩件事,将Message的target指向自己,調用MessageQueue的enqueueMessage方法将消息入隊。這個target是定義在Message中的類型為Handler的變量,用于處理Message事件。

  那麼Handler又是如何處理消息的呢?

public interface Callback {
    public boolean handleMessage(Message msg);
}

public void handleMessage(Message msg) {}
           

  由上面可以看出,在Handler中,聲明了一個接口Callback,裡面隻有一個方法handleMessage,這個方法就是處理Message消息的,Callback在構造方法中傳入。當然,若是在構造方法中沒有傳入Callback的話還有一種方式,Handler中也定義了一個handleMessage方法,預設是個空實作。可以通過重寫這個方法來處理Message消息。

  而消息的處理也是要根據這個來确定的。首先會去判斷這個Message是不是個空消息,即Message的callback字段是否為空,若不是空消息,則直接執行它的callback方法(Runnable.run);若是空消息則由Handler來處理。而Handler處理又分為兩種情況,若是在構造方法中傳入了Callback,則使用Callback的handleMessage來處理,否則使用Handler本身的handleMessage處理消息。

public void dispatchMessage(Message msg) {
    if (msg.callback != null) {
        handleCallback(msg);//自己處理
    } else {
    	//handler處理
        if (mCallback != null) {
            if (mCallback.handleMessage(msg)) {
                return;
            }
        }
        handleMessage(msg);
    }
}

private static void handleCallback(Message message) {
    message.callback.run();
}
           

  上面這個方法就是用來分發消息的,可以看到處理事件的優先過程就是我們分析的這樣。

  這裡直接就到分發消息這一步了,而又是誰dispatchMessage的呢?這就要回到Looper中了,在Looper中我們使用prepare方法建立Looper,然後使用loop方法開啟循環,在循環中阻塞式地從MessageQueue中取Message。當取到Message後,執行了這句:

  而在之前發送消息的分析中可以看到,Handler在enqueueMessage的時候,将msg.target指向了自己this,也就是說分發消息的過程是在Looper中調用的,但還是自己(發送消息的Handler)執行的。這樣的話,不管在哪個線程中發送的消息,當執行這個消息時,其實就已經切換到了Handler中Looper所在的線程。

  現在的問題就到了Looper線程是哪個線程上了。這時候就又要拿出Handler的構造方法了。前面我們就走過場式地分析了一個Handler的構造方法,而Handler一共有7個構造方法。

public Handler() {
    this(null, false);
}

public Handler(Callback callback) {
    this(callback, false);
}

public Handler(Looper looper) {
    this(looper, null, false);
}

public Handler(Looper looper, Callback callback) {
    this(looper, callback, false);
}
   
public Handler(boolean async) {
    this(null, async);
}

public Handler(Callback callback, boolean async) {
    if (FIND_POTENTIAL_LEAKS) {
        final Class<? extends Handler> klass = getClass();
        if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
                (klass.getModifiers() & Modifier.STATIC) == 0) {
            Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
                klass.getCanonicalName());
        }
    }
    mLooper = Looper.myLooper();
    if (mLooper == null) {
        throw new RuntimeException(
            "Can't create handler inside thread " + Thread.currentThread()
                    + " that has not called Looper.prepare()");
    }
    mQueue = mLooper.mQueue;
    mCallback = callback;
    mAsynchronous = async;
}

public Handler(Looper looper, Callback callback, boolean async) {
    mLooper = looper;
    mQueue = looper.mQueue;
    mCallback = callback;
    mAsynchronous = async;
}
           

  從上面的7個構造方法可以看出,若是沒有指定Looper,則使用執行個體化Handler時所線上程的Looper;若是指定,則使用指定的Looper。

小結

至此,Handler,Message,Looper,MessageQueue四者之間的關系已經清晰了。

  • Message作為消息載體,用于承載消息。
  • MessageQueue作為消息隊列,用于存放消息。
  • Looper作為消息傳送的動力,用于擷取分發消息。
  • Handler作為消息進行中心,用于處理消息和發送消息。

那麼再來看下它們之間的包含關系。

  1. 首先,Message作為消息主體,是獨立的個體。
  2. 其次,MessageQueue用于存放消息,那麼MessageQueue包含Message,對應多個Message。
  3. 而Looper,作為整個消息循環的動力,則包含了MessageQueue,并且一個Looper隻有一個MessageQueue;另外在Looper的靜态成員ThreadLocal中還存放了所有使用這套消息機制的線程的Looper。
  4. 最後是Handler,作為發送消息的主體,肯定需要知道向哪個線程發送消息,也就是需要知道線程對應的MessageQueue,即需要知道線程的Looper。是以,Handler包含Looper。

剩下的就是如何使用這套機制了。

  首先,在需要使用這套消息機制的線程中使用Looper的靜态方法Looper.prepare()來初始化,然後使用Looper.loop()開啟消息循環。這樣,該線程就已經啟用這套機制了,接下來直接使用Handler發送消息就行了。

Handler确定發送消息的目标線程的方式

  系統中可能存在多套消息機制,那麼Handler又如何保證發送的消息确實是被發送到了目标線程呢?因為Handler内部包含Looper,是以發送的消息就是Looper所在的線程。而Looper又是在構造方法中獲得的或者傳遞進來的,是以確定向目标線程發送消息有兩種方式。

  1. 在目标線程中構造Handler,然後将Handler傳遞到想要發送消息的線程中。文章開頭的小案例就是這種方法。
  2. 獲得目标線程的Looper,在構造Handler時将Looper傳遞進去。這種方式不限定Handler執行個體化時所在的線程。

  到這裡的時候,這套消息機制已經很明确了。那麼問題來了,我們既然能夠通過這套機制向UI線程發送消息,就說明UI線程肯定也啟用了這套機制。而這裡又要涉及android下應用程式啟動的過程,這裡隻看一下ActivityThread類中的方法。

//  ....\sdk\sources\android-28\android\app\ActivityThread.java

public static void main(String[] args) {
    ...
    Looper.prepareMainLooper();      
    ...  
    Looper.loop();
    
    throw new RuntimeException("Main thread loop unexpectedly exited");
}
           

  可以看到,在主函數中也是調用了這套機制的,但是使用的并不是Looper.prepare()而是Looper.prepareMainLooper(),在上面的Looper分析中我們知到,prepareMainLooper會将目前線程設定為主線程,也就是UI線程。是以作為開發者一般是被禁止使用這個方法開啟Looper消息機制的。

  之是以我們可以使用Handler向UI線程發送消息,就是因為UI線程在啟動的時候就已經開啟這套消息機制了。

  由于同一個程序中的資料,在不同線程之間是可以共享的。這裡Handler就可以在不同的線程中給目标線程的消息隊列發送消息,然後目标線程處理消息。

注:這裡沒有分析Message和MessageQueue,因為他倆都是作為服務出現的

以上代碼全部取自于api28(Android 9.0)中

補充

  雖然文章是分析四者的關系,但是既然已經從源碼方面分析了Handler和Looper,那至少剩下的兩者也得提一下吧。

Message:消息載體

  Message從名字就可以看出是消息的載體。

public final class Message implements Parcelable {
    //用于差別消息的類型
    public int what;
    //攜帶資料,空消息所攜帶的資料
    public int arg1;
    public int arg2;
    //攜帶資料(程序間通信時隻能攜帶系統定義的parcelable對象,自定義的不行)
    public Object obj;
    //Messenger進行程序間通訊時,用于實作雙向通訊
    public Messenger replyTo;
	//攜帶資料,可存放多條
	Bundle data;
    //消息所攜帶的代碼語句資料
    Runnable callback;
    
    //消息的處理目标對象,用于處理消息
    Handler target;
    //用于标記該Message是否在被使用  
	int flags;
	//存放時間,用于在MessageQueue中實作排序
	long when;
	//用于實作單連結清單,以連結清單實作Message池
    Message next;
    //連結清單頭指針
    private static Message sPool;
    private static int sPoolSize = 0;//池的目前大小
    private static final int MAX_POOL_SIZE = 50;//池的最大容量
	 ...
}
           

  上面是Message的一些字段,是用來存儲攜帶消息的。具體的已經在代碼中标出了。并且這些字段都是public類型,可以直接通過對象通路。

  從一些字段上就可以看出,Message實際上使用了一個資料池來對Message對象進行回收和再利用。是以,雖然Message的構造方法是public的,但是系統建議我們使用obtain方法來擷取對象,因為這樣可以從對象池中擷取Message,避免了多次配置設定對象。

  那麼來看一下obtain是如何獲得Message對象的。

public static Message obtain() {
	synchronized (sPoolSync) {
        if (sPool != null) {
            Message m = sPool;
            sPool = m.next;
            m.next = null;
            m.flags = 0; // clear in-use flag
            sPoolSize--;
            return m;
        }
    }
    return new Message();
}
           

  當開始obtain方法時,首先開啟了一個對象鎖,這樣就避免了可能會造成的混亂情況。然後當mPool不為空的時候,也就是對象池中有資料的時候,會取出單連結清單的表頭Message,然後池内數量減一。唯一需要注意的可能就是在操作對象池的時候需要注意線程的同步。

  當然系統不會隻提供一個obtain方法的,而是一系列的方法,而實際上都是調用這個空參數的方法,隻是在這個基礎上添加了一些參數資料而已。

public static Message obtain()
public static Message obtain(Message orig)
public static Message obtain(Handler h)
public static Message obtain(Handler h, int what) 
public static Message obtain(Handler h, Runnable callback)
public static Message obtain(Handler h, int what, Object obj)
public static Message obtain(Handler h, int what, int arg1, int arg2)
public static Message obtain(Handler h, int what,int arg1, int arg2, Object obj)
           

  上面的一些方法也不用解釋了,隻是看參數基本上就明白意思了。

  既然Message是由對象池來産生的,那麼也需要關注一下它的回收過程。

public void recycle() {
    if (isInUse()) {
        if (gCheckRecycle) {
            throw new IllegalStateException("This message cannot be recycled because it "
                    + "is still in use.");
        }
        return;
    }
    recycleUnchecked();
}
           

  這個方法首先檢查一下目前要回收的Message是否是正在使用的。然後調用recycleUnchecked方法進行回收。

  是以,當我們使用完一個Message時,也要通過message.recycle()來将已經不使用的Message回收。

void recycleUnchecked() {
    // Mark the message as in use while it remains in the recycled object pool.
    // Clear out all other details.
    flags = FLAG_IN_USE;
    what = 0;
    arg1 = 0;
    arg2 = 0;
    obj = null;
    replyTo = null;
    sendingUid = -1;
    when = 0;
    target = null;
    callback = null;
    data = null;

    synchronized (sPoolSync) {
        if (sPoolSize < MAX_POOL_SIZE) {
            next = sPool;
            sPool = this;
            sPoolSize++;
        }
    }
}
           

  recycleUnchecked回收之前先重置Message的狀态,包括設定為未使用狀态和清空所寫攜帶的資料。可以看到,在将Message放回對象池的時候會首先判對象池的容量是否已經滿了,隻有未滿的時候才會回收進對象池,否則将會丢棄等待GC的回收。

  Message就是一個簡單的消息實體,沒有什麼複雜的地方,也就是其中使用的Message池值得關注一點。

MessageQueue:存放消息的隊列

  其實MessageQueue内部還是比較複雜的,但是這裡我們隻作簡單分析一下。由名字可以看出,MessageQueue是一個隊列類型的資料結構,那麼總體上肯定就是先進先出的通路順序。而從源碼中我們會發現,這個隊列的實作方式和Message的對象池一樣,也是由Message連接配接而成的單連結清單。還記得Message有個屬性long when,裡面存放的就是放入MessageQueue的時間,而MessageQueue會按照when的大小将Message隊列進行排序。

  老規矩,從構造方法開始看起,但是MessageQueue的構造方法什麼都沒有,就是設定了一下是否允許退出隊列的值并執行了一條native函數進行了初始化。

MessageQueue(boolean quitAllowed) {
    mQuitAllowed = quitAllowed;
    mPtr = nativeInit();
}
           

  作為構造函數,卻隻有一個native方法,這不禁讓我們好奇這條native語句到底做了什麼事,傳回值mPtr又是什麼呢?先從命名可以看出,學過C/C++的都知道,ptr明顯的就是指針的常用命名習慣,這裡到底是不是就要繼續跟進源碼來分析了。

  對應的native檔案在這裡:android_os_MessageQueue.cpp

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    if (!nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");
        return 0;
    }

    nativeMessageQueue->incStrong(env);
    return reinterpret_cast<jlong>(nativeMessageQueue);
}
           

  從這個native方法我們可以看出跟我們分析的差不多,首先該方法在native層建立了一個NativeMessageQueue對象,然後将對象位址指針傳回到java層儲存起來。但是到這裡又會有疑惑了,為什麼要在native層也建立一個消息隊列呢?這個消息隊列與java層的消息隊列是同一個東西嗎?

  但是我們是分析MessageQueue的,這些都算是具體實作了,我們當然也會分析,但是要先把MessageQueue走一遍,然後在走的過程再分析具體細節,不然跟蹤源碼那麼多很容易最後迷失在源碼中,是以這裡就現在腦海中留下一個疑問,然後繼續看MessageQueue。

  上面說到構造方法主要是在native層建立了一個NativeMessageQueue對象然後将指針儲存在了MessageQueue的屬性mPtr中。那麼接下來,作為隊列,重要的方法肯定是入隊和出隊的實作了,下面是入隊方法的源碼。

boolean enqueueMessage(Message msg, long when) {
   //不接受沒有目标處理對象的消息入隊(即不收沒有收件人的信件),也不接受正在使用中的消息入隊
    if (msg.target == null) {
        throw new IllegalArgumentException("Message must have a target.");
    }
    if (msg.isInUse()) {
        throw new IllegalStateException(msg + " This message is already in use.");
    }

    synchronized (this) {
    	//若是該隊列已經退出,則直接回收消息,隻有在quit方法中才設定該屬性為true
        if (mQuitting) {
            IllegalStateException e = new IllegalStateException(
                    msg.target + " sending message to a Handler on a dead thread");
            Log.w(TAG, e.getMessage(), e);
            msg.recycle();
            return false;
        }
		//标記正在使用,并設定時間
        msg.markInUse();
        msg.when = when;
        //p指向隊列(單連結清單)的頭部
        Message p = mMessages;//這裡的隊列也是由Message構成的單連結清單
        boolean needWake;
        //按照時間順序,将時間值最小的放在隊首,隊首是最先出隊的
        if (p == null || when == 0 || when < p.when) {
            // New head, wake up the event queue if blocked.
            msg.next = p;
            mMessages = msg;
            needWake = mBlocked;
        } else {
            needWake = mBlocked && p.target == null && msg.isAsynchronous();
            //按照時間順序(從小到大)将Message插在對應的位置
            Message prev;
            for (;;) {
                prev = p;
                p = p.next;
                if (p == null || when < p.when) {
                    break;
                }
                if (needWake && p.isAsynchronous()) {
                    needWake = false;
                }
            }
            msg.next = p; // invariant: p == prev.next
            prev.next = msg;
        }
        //這裡在插入方法中可以看出,隻要插入的Message不是異步的,那麼
        //needWake的值就會是mBlocked的值,而mBlocked的值會在出隊方
        //法next中,當線程阻塞的時候設為True。而這裡當有非異步的Message入隊時,
        //就會調用nativeWake方法将線程喚醒來處理消息
        if (needWake) {
            nativeWake(mPtr);
        }
    }
    return true;
}
           

  從上面的注釋可以看出,enqueueMessage會按照時間從小到大的順序将消息插入在相應的位置。因為MessageQueue中的隊列是由Message實作的,也就是Message和它的屬性next實作的單連結清單(在上面的Message的對象池中講過),而單連結清單是隻能按照從表頭至表尾的順序通路的,是以在MessageQueue中,入隊是插在表尾,而出隊是從表頭取出的。

  接下來就是比較複雜的出隊方法next的實作了,這個方法比較複雜代碼也比較多,就一點點分析而不是直接全部貼出來了:

Message next() {
   //ptr是native層對象指針,為0時表示MessageQueue已經結束
    final long ptr = mPtr;
    if (ptr == 0) {
        return null;
    }
    int pendingIdleHandlerCount = -1; // -1 only during first iteration
    int nextPollTimeoutMillis = 0;
    for (;;) {
        if (nextPollTimeoutMillis != 0) {
            Binder.flushPendingCommands();
        }
        nativePollOnce(ptr, nextPollTimeoutMillis);
    }
    ...
}
           

  可以看到,在next方法中調用了一個沒有結束條件的for循環,那麼肯定在循環内部通過break、return等結束循環,這裡先不管這個結束條件。首先先看nativePollOnce這個方法,因為每次循環的時候都會調用該方法。

  對應的native檔案在這裡:android_os_MessageQueue.cpp

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
     //将java層傳進來的mPtr再轉換成對應的指針
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
           

  可以看到,對應的native方法是直接調用了NativeMessageQueue的pollOnce方法。到這裡,我們将java層的分析先放在一邊,然後分析完native層的實作後再轉回java層。

那麼NativeMessageQueue到底是什麼呢?剛才在nativeInit方法的時候就提到過這個并保留了一些疑問,這裡将具體看一下源碼以便将問題解決:

  對應的native檔案在這裡:android_os_MessageQueue.cpp

class NativeMessageQueue : public MessageQueue, public LooperCallback {
public:
    NativeMessageQueue();
    virtual ~NativeMessageQueue();

    virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj);

    void pollOnce(JNIEnv* env, jobject obj, int timeoutMillis);
    void wake();
    void setFileDescriptorEvents(int fd, int events);

    virtual int handleEvent(int fd, int events, void* data);

private:
    JNIEnv* mPollEnv;
    jobject mPollObj;
    jthrowable mExceptionObj;
};
           

  上面是NativeMessageQueue的定義,可以看到除了構造函數和析構函數外,還有一個處理異常資訊的raiseException,一個處理正常資訊的handleEvent,一個設定檔案描述事件監聽的setFileDescriptorEvents,一個使線程進入阻塞的pollOnce,一個喚醒線程的wake方法。

  那麼同樣的,從構造方法看一下:

NativeMessageQueue::NativeMessageQueue() :
        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
    mLooper = Looper::getForThread();
    if (mLooper == NULL) {
        mLooper = new Looper(false);
        Looper::setForThread(mLooper);
    }
}
           

  從構造方法可以看出,NativeMessageQueue的内部實作了一個Looper,并且這個Looper也是跟線程相關的。

  源碼在這裡:Looper.h

class Looper : public RefBase {
protected:
    virtual ~Looper();

public:
	//定義wait的傳回值
    enum {
        POLL_WAKE = -1,
        POLL_CALLBACK = -2,
        POLL_TIMEOUT = -3,
        POLL_ERROR = -4,
    };
	//定義一些wait事件
    enum {
        EVENT_INPUT = 1 << 0,
        EVENT_OUTPUT = 1 << 1,
        EVENT_ERROR = 1 << 2,
        EVENT_HANGUP = 1 << 3,
        EVENT_INVALID = 1 << 4,
    };

    enum {
        PREPARE_ALLOW_NON_CALLBACKS = 1<<0
    };

    Looper(bool allowNonCallbacks);
    
    bool getAllowNonCallbacks() const;
    //重點關注
    int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData);
    inline int pollOnce(int timeoutMillis) {
        return pollOnce(timeoutMillis, NULL, NULL, NULL);
    }

	//類似于pollOnce
    int pollAll(int timeoutMillis, int* outFd, int* outEvents, void** outData);
    inline int pollAll(int timeoutMillis) {
        return pollAll(timeoutMillis, NULL, NULL, NULL);
    }

	//喚醒線程
    void wake();

	//添加檔案描述
    int addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data);
    int addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data);

    int removeFd(int fd);

	//下面一系列的發送檔案有沒有感覺很像Handler
    void sendMessage(const sp<MessageHandler>& handler, const Message& message);
    void sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
            const Message& message);
    void sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
            const Message& message);
    void removeMessages(const sp<MessageHandler>& handler);
    void removeMessages(const sp<MessageHandler>& handler, int what);

    bool isPolling() const;

	//在Handler中是通過ThreadLocal來實作的
    static sp<Looper> prepare(int opts);
    static void setForThread(const sp<Looper>& looper);
    static sp<Looper> getForThread();

private:
    struct Request {
        int fd;
        int ident;
        int events;
        int seq;
        sp<LooperCallback> callback;
        void* data;
        void initEventItem(struct epoll_event* eventItem) const;
    };

    struct Response {
        int events;
        Request request;
    };

    struct MessageEnvelope {
        MessageEnvelope() : uptime(0) { }

        MessageEnvelope(nsecs_t u, const sp<MessageHandler> h,
                const Message& m) : uptime(u), handler(h), message(m) {
        }

        nsecs_t uptime;
        sp<MessageHandler> handler;
        Message message;
    };

    const bool mAllowNonCallbacks; // immutable

    //用來發出喚醒事件的檔案描述符
    int mWakeEventFd;  // immutable
    ...
};
           

  這裡隻看Looper的定義,具體實作就不看了,需要用到的下面将會再做分析,而其他的隻能看自己的興趣了。不過隻是從定義上我們就可以看出,這裡的Looper有點像java層的Handler。

  這裡是實作Looper的實作的源碼:Looper.cpp

  繼續看NativeMessageQueue,除了構造方法外最重要的就是pollOnce方法了。

  源碼在這裡:android_os_MessageQueue.cpp

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;

    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}
           

  可以看到調用了Looper的pollOnce方法。在剛才的Looper的定義Looper.h中我們看到這句。

int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData);
inline int pollOnce(int timeoutMillis) {
    return pollOnce(timeoutMillis, NULL, NULL, NULL);
}
           

  是以最終調用的四個參數的pollOnce方法,那麼繼續追蹤到Looper.cpp中的實作上:

  源碼在這裡:Looper.cpp

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            if (ident >= 0) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
                ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
                        "fd=%d, events=0x%x, data=%p",
                        this, ident, fd, events, data);
#endif
                if (outFd != NULL) *outFd = fd;
                if (outEvents != NULL) *outEvents = events;
                if (outData != NULL) *outData = data;
                return ident;
            }
        }

        if (result != 0) {
#if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = 0;
            if (outData != NULL) *outData = NULL;
            return result;
        }

        result = pollInner(timeoutMillis);
    }
}
           

  可以看到最終調用了pollInner方法,接着往下追蹤,這個方法比較長,這裡在源碼中分出了好幾個段落點,後面會根據這些來進行分析:

  源碼在這裡:Looper.cpp

int Looper::pollInner(int timeoutMillis) {

	//---------------------第一步----------------------------------------------------
	//獲得native下一個消息的時間,與java層逾時時間相比,取最小的時間
    // Adjust the timeout based on when the next message is due.
    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
        if (messageTimeoutMillis >= 0
                && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
            timeoutMillis = messageTimeoutMillis;
        }
    }

    // Poll.
    int result = POLL_WAKE;
    //清空response資訊,線上程被喚醒後,會将其他事件放入其中,response是個Vector集合
    mResponses.clear();
    mResponseIndex = 0;

    // We are about to idle.
    mPolling = true;

    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    //-----------------------第二步------------------------------------
    //這裡是線程進入阻塞的主要方法
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    // No longer idling.
    mPolling = false;

    // Acquire lock.
    mLock.lock();

    // Rebuild epoll set if needed.
    if (mEpollRebuildRequired) {
        mEpollRebuildRequired = false;
        rebuildEpollLocked();
        goto Done;
    }
    //------------------------第三步------------------------------------
	//線程阻塞出錯結果
    // Check for poll error.
    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }
        ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
       result = POLL_ERROR;
        goto Done;
    }
	//線程阻塞逾時結果
    // Check for poll timeout.
    if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - timeout", this);
#endif
        result = POLL_TIMEOUT;
        goto Done;
    }

    // Handle all events.
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
	//------------------------------------第四步-------------------------------------
	//線程阻塞通過監聽事件喚醒,然後處理收到的事件
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        //喚醒事件相關的檔案描述符發生的事件
        if (fd == mWakeEventFd) {
            if (epollEvents & EPOLLIN) {
                //清空檔案事件
                awoken();
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
            }
        //若是其他事件,則将事件封裝後放入mResponse中(Vector集合)
        } else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
    //Done,什麼都沒做,在epoll_wait逾時或者出錯時跳轉到這裡
Done: ;
	//----------------------------------第五步---------------------------------------
    mNextMessageUptime = LLONG_MAX;
    //處理Looper中Message事件,由Looper發送的,與java層的Message事件一緻,也是按時間排序
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
       //消息時間小于目前時間,也就是說消息已經可以處理了
        if (messageEnvelope.uptime <= now) {
            { // obtain handler
                sp<MessageHandler> handler = messageEnvelope.handler;
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock();
                handler->handleMessage(message);
            } // release handler
            mLock.lock();
            mSendingMessage = false;
            result = POLL_CALLBACK;
        //消息還未到可以處理的時候
        } else {
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }

    // Release lock.
    mLock.unlock();

	//處理response事件,在上面線程喚醒的時候添加進去的
    for (size_t i = 0; i < mResponses.size(); i++) {
        Response& response = mResponses.editItemAt(i);
        if (response.request.ident == POLL_CALLBACK) {
            int fd = response.request.fd;
            int events = response.events;
            void* data = response.request.data;
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq);
            }
            response.request.callback.clear();
            result = POLL_CALLBACK;
        }
    }
    return result;
}
           

  這個pollInner方法稍微有點多,那麼這裡将一點點分析。

  先看第一點,比較的是timeoutMillis 和 mNextMessageUptime,前者是我們從java層一路傳遞進來的逾時時間,後者是native層的Message隊列的下一個事件的時間。mNextMessageUptime在後面設定的,首先若是native層的Message隊列中沒有了待處理的消息,則設其值為LLONG_MAX。若是有待處理的消息,也就是說消息還未到處理的時間,則設為待處理的消息的時間。

  然後呢,根據這個值與目前時間進行計算,算出是待處理消息需要的時間久還是timeoutMillis需要的時間久,然後取最小的,從這裡也可以看出,當native層有消息時也會提前喚醒線程。

  繼續跟着源碼往下走就到了第二點,首先第二點調用了epoll_wait方法。

epoll是Linux核心為處理大批量檔案描述符而作了改進的poll,是Linux下多路複用IO接口select/poll的增強版本,它能顯著提高程式在大量并發連接配接中隻有少量活躍的情況下的系統CPU使用率。

  簡單點說或者說根據本文意圖來解釋的話就是:epoll是linux下的方法,會阻塞線程直到有事件通知。

  那麼這裡就要簡單說明一下epoll了,首先epoll會監聽一些檔案描述符,然後給這些檔案描述添加一些事件,這樣當線程通路的時候就會阻塞,然後待這些檔案描述符發生添加的事件的時候就會喚醒。epoll主要涉及到如下幾個方法:

int epoll_create ( int size );//建立一個epoll監聽
     //對事件進行操作
     int epoll_ctl ( int epfd, int op, int fd, struct epoll_event *event );
     	epfd:epoll描述符,由create建立的
     	op:具體操作
    		EPOLL_CTL_ADD:往事件表中注冊fd上的事件
            EPOLL_CTL_MOD:修改fd上的注冊事件
            EPOLL_CTL_DEL:删除fd上的注冊事件
        fd:要注冊觀察的檔案描述符,可以是具體的檔案,socket,管道等
        event:具體要檢測的事件
        	EPOLLIN:有資料流入,即檔案非空
        	EPOLLOUT:有資料可以寫入,即檔案非滿
     //阻塞等待事件發生
	 int epoll_wait ( int epfd, struct epoll_event* events, int maxevents, int timeout );
         epfd:create建立的epoll描述符
         events:存放事件的數組
         maxevents:最大可存放的事件個數,數組events的大小
         timeout:逾時時間,為0時立即傳回,為-1時一直阻塞,否則等待timeout時間後傳回
           

  可以看到,在第二步時線程可能會根據timeout的值而進行相應時間的阻塞,這裡也是java層的next方法阻塞的實作。當調用epoll_wait方法時,線程就會阻塞在這裡,阻塞timeoutMillis時間,除非發生注冊監聽的事件。

  接着走到第三步,第三步是對eventCount的判斷,而eventCount又是epoll_wait的傳回值,是以這裡要說明一下它的傳回值的差別。首先,epoll_wait有三個可喚醒的條件,逾時,錯誤,事件。當發生錯誤時,傳回值為-1,逾時傳回值為0,發生事件傳回值為事件的個數。

  可以看到,當傳回值為0或-1的時候,隻是設定了result的值POLL_TIMEOUT和POLL_ERROR,然後goto跳轉到Done,而Done什麼都沒做。是以發生逾時和錯誤時是不進行操作隻是修改result值而已。

  那麼當有事件發生的結果呢?這裡要繼續看第四步了,第四步循環所有事件,然後判斷發生事件的是不是專門用來喚醒線程的mWakeEventFd檔案描述符,不是的話則根據對應的事件封裝成Message然後通過pushResponse方法放在mResponse(Vector集合)中;若是的話則通過 awoken()方法清空mWakeEventFd中的資料。

這裡涉及到兩個方法,簡單的看一下吧:

  源碼在這裡:Looper.cpp

void Looper::pushResponse(int events, const Request& request) {
    Response response;
    response.events = events;
    response.request = request;
    mResponses.push(response);
}

//該方法從mWakeEventFd中read()讀取了一個uint64_t的資料,其實就是一個正整數
//之是以隻讀一個數字,是因為在喚醒的方法中,隻寫入了一個資料,是以讀取一個資料就會
//使其再次陷入空狀态
void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ awoken", this);
#endif

    uint64_t counter;
    TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t)));
}
           

  第五步是處理事件,包括兩部分的事件:Looper中的事件和線程wait時傳回的事件。另外,若是Looper中有可以處理的事件或者wait中有事件,會設定result為POLL_CALLBACK。

  到這裡,pollInner已經分析完了,分為五步:1是計算wait時間,2是進行wait等待,3是對wait傳回值的判斷,4也是對wait傳回值的判斷但是是對事件觸發傳回的判斷,5是處理Looper和wait的事件。

  上面我們分析到了java層的next方法,該方法在循環的時候調用了nativePollOnce,然後我們又進入native層分析了一下,總算是nativePollOnce方法弄明白了:陷入阻塞,等待被喚醒。總結起來就是這9個字。接下來繼續看next方法,畢竟這才是我們整個分析的主線:

Message next() {
   ...
    int nextPollTimeoutMillis = 0;
    for (;;) {
        ...
        //根據我們上面的分析,nextPollTimeMillis為0則不阻塞,也就是第一次循環不阻塞
        nativePollOnce(ptr, nextPollTimeoutMillis);

        synchronized (this) {
            final long now = SystemClock.uptimeMillis();
            Message prevMsg = null;
            Message msg = mMessages;
            //這裡判斷msg不為空但是target為空,而我們enqueueMessage的時候特意設定了target的
            //是以這裡的msg不是我們設定而是系統在初始化的時候設定的屏障,這裡不再詳解
            if (msg != null && msg.target == null) {
                // Stalled by a barrier.  Find the next asynchronous message in the queue.
                do {
                    prevMsg = msg;
                    msg = msg.next;
                } while (msg != null && !msg.isAsynchronous());
            }
            //這裡是正常情況下的msg
            if (msg != null) {
            	//未達到處理時間的,将會計算需要等待的時間,不超過整形的最大值
                if (now < msg.when) {
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                 //可以處理的則直接取出後傳回
                 } else {
                    // Got a message.
                    mBlocked = false;
                    if (prevMsg != null) {
                        prevMsg.next = msg.next;
                    } else {
                        mMessages = msg.next;
                    }
                    msg.next = null;
                    if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                    msg.markInUse();
                    return msg;
                }
             //這裡是隊列中沒有消息
            } else {
                nextPollTimeoutMillis = -1;
            }
			//上面分别對消息隊列進行判斷然後修改nextPollTimeoutMillis,而之前的分析可以看出這個值就是線程
			//需要阻塞的時長,有未達到處理時間的消息則阻塞對應時間,沒有消息則一直阻塞直到被喚醒
		...
		}
		...
    }       
}
           

  next也講完了,還剩下一點的就是在enqueueMessage的時候,調用了喚醒線程的方法nativeWake(mPtr),由于前面還沒講到線程是如何在next中阻塞的,是以一直沒分析這個方法,現在我們分析完next後知道了線程是在native層通過epoll_wait阻塞的,那麼喚醒的方式肯定是對相應的檔案描述符的操作。

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->wake();
}

void NativeMessageQueue::wake() {
    mLooper->wake();
}

void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ wake", this);
#endif

    uint64_t inc = 1;
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    if (nWrite != sizeof(uint64_t)) {
        if (errno != EAGAIN) {
            LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s",
                    mWakeEventFd, strerror(errno));
        }
    }
}
           

  從上面的nativeWake一路調用到MessageQueue中的wake然後到Looper中的wake。而在Looper中的wake,我們看到,使用write向mWakeEventFd中寫入了一個uint64_t 的1,與我們前面分析pollInner中的對應。在pollInner中,若是由于對應的檔案描述符發生的事件,調用awoke清空檔案,而awoke剛好就是讀取一個uint64_t的值。

補充總結

  MessageQueue的主要方法next和enqueueMessage,其中next方法在隊列為空或者消息暫未達到處理時間的時候,線程會阻塞,這裡的阻塞是通過native層的epoll方式進行的阻塞。enqueueMessage方法在添加Message的同時也會判斷是否需要喚醒線程,若是需要則在native層通過對檔案的寫入資料而觸發epoll設定的事件監聽,是以喚醒線程。

繼續閱讀