天天看點

Chromium多線程模型設計和實作分析

       Chromium除了遠近聞名的多程序架構之外,它的多線程模型也相當引人注目的。Chromium的多程序架構是為了解決網頁的穩定性問題,而多線程模型則是為了解決網頁的卡頓問題。為了達到這個目的,Chromium的多線程模型是基于異步通信的。也就是說,一個線程請求另外一個線程執行一個任務的時候,不需要等待該任務完成就可以去做其它事情,進而避免了卡頓。本文就分析Chromium的多線程模型的設計和實作。

老羅的新浪微網誌:http://weibo.com/shengyangluo,歡迎關注!

《Android系統源代碼情景分析》一書正在進擊的程式員網(http://0xcc0xcd.com)中連載,點選進入!

       有同學看到這裡可能會有疑問,如果一個線程請求另外一個線程執行一個任務的時候,就是要等該任務完成之後才能做其它事情,那麼使用異步通信有什麼用呢?的确如此,但是Chromium提供這種基于異步通信的多線程模型,就是希望作為開發者的你在實作一個子產品的時候,盡最大努力地設計好各個子子產品及其對應的資料結構,使得它們在協作時可以最大程度地進行異步通信。是以,Chromium基于異步通信的多線程模型更多的是展現一種設計哲學。

       一個典型的異步通信過程如圖1所示:

Chromium多線程模型設計和實作分析

圖1  線程異步通信過程

       Task-1被分解成三個子任務Task-1(1)、Task-1(2)和Task-1(3)。其中,Task-1(1)由Thread-1執行。Task-1(1)執行完成後,Thread-1通過我們在前面Chromium多線程通信的Closure機制分析一文分析的Closure請求Thread-2執行Task-1(2)。Task-1(2)執行完成後,Thread-2又通過一個Closure請求Thread-1執行Task-1(3)。至此,Task-1就執行完成。我們可以将第一個Closure看作是一個Request操作,而第二個Closure是一個Reply操作。這是一個典型的異步通信過程。當然,如果不需要知道異步通信結果,那麼第二個Closure和Task-1(3)就是不需要的。

       假設Thread-1需要知道異步通信的結果,那麼在圖1中我們可以看到一個非常關鍵的點:Thread-1并不是什麼也不幹就隻是等着Thread-2執行完成Task-1(2),它趁着這個等待的空隙,幹了另外一件事情——Task-2。如果我們将Thread-1看作是一個UI線程,那麼就意味着這種異步通信模式是可以提高它的響應性的。

       為了能夠完成上述的異步通信過程,一個線程的生命周期如圖2所示:

Chromium多線程模型設計和實作分析

圖2 線程生命周期

       線程經過短暫的啟動之後(Start),就圍繞着一個任務隊列(TaskQueue)不斷地進行循環,直到被通知停止為止(Stop)。在圍繞任務隊列循環期間,它會不斷地檢查任務隊列是否為空。如果不為空,那麼就會将裡面的任務(Task)取出來,并且進行處理。這樣,一個線程如果要請求另外一個線程執行某一個操作,那麼隻需要将該操作封裝成一個任務,并且發送到目标線程的任務隊列去即可。

       為了更好地了解這種基于任務隊列的線程運作模式,我們腦補一下另外一種常用的基于鎖的線程運作模式。一個線程要執行某一個操作的時候,就直接調用一個代表該操作的一個函數。如果該函數需要通路全局資料或者共享資料,那麼就需要進行加鎖,避免其它線程也正在通路這些全局資料或者共享資料。這樣做的一個好處是我們隻需要關心問題的模組化,而不需要關心問題是由誰來執行的,隻要保證邏輯正确并且資料完整即可。當然壞處也是顯然的。首先是為了保持資料完整性,也就是避免通路資料時出現競争條件,代碼裡面充斥着各種鎖。其次,如果多個線程同時擷取同一個鎖,那麼就會産生競争。這種鎖競争會帶來額外的開銷,進而降低線程的響應性。

       基于任務隊列的線程運作模式,要求在對問題進行模組化時,要提前知道誰是執行者。也就是說,在對問題進行模組化時,需要指派好每一個子問題的執行者。這樣我們為子問題設計資料結構時,就規定這些資料結構僅僅會被子問題的執行者通路。這樣執行者在解決指派給它的問題時,就不需要進行加鎖操作,因為在解決問題過程中需要通路的資料不會同時被其它執行者通路。這就是通過任務隊列來實作異步通信的多線程模型的設計哲學。

       當然,這并不是說,基于任務隊列的線程運作模式可以完全避免使用鎖,因為任務隊列本身就是一個線程間的共享資源。想象一下,一個線程要往裡面添加任務,另一個線程要從裡面将任務提取出來處理。是以,所有涉及到任務隊列通路的地方都是需要加鎖的。但是如果我們再仔細想想,那麼就會發現,任務隊列隻是一個基礎設施,它與具體的問題是無關的。是以,隻要我們遵循上述設計哲學,就可以将代碼裡面需要加鎖的地方僅限于通路任務隊列的地方,進而就可以減少鎖競争帶來的額外的開銷。

       這樣說來,似乎基于任務隊列的線程運作模式很好,但是實際上它對問題模組化提出了更高的要求,也就是進行子問題劃分時,要求劃分出來的子問題是正交的,這樣我們才有可能為這些子問題設計出不會同時被通路的資料結構。看到“正交”兩個字,是不是想起高數裡面的向量空間的正交基了?或者傅裡葉變換用到的一組三角函數了?其實道理就是一樣一樣的。

       好了,說了這麼多,我們就步入到正題,分析Chromium多線程模型的設計和實作,也就是基于任務隊列的線程運作模式涉及到核心類圖,如圖3所示:

Chromium多線程模型設計和實作分析

圖3 基于任務隊列的線程運作模式核心類關系圖

       Thread是一個用來建立帶消息循環的類。當我們建立一個Thread對象後,調用它的成員函數Start或者StartWithOptions就可以啟動一個帶消息循環的線程。其中,成員函數StartWithOptions可以指定線程建立參數。當我們不需要這個線程時,就可以調用之前建立的Thread對象的成員函數Stop。

       Thread類繼承了PlatformThread::Delegate類,并且重寫了它的成員函數ThreadMain。我們知道,Chromium是跨平台的,這樣各個平台建立線程使用的API有可能是不一樣的。不過,我們可以通過PlatformThread::Delegate類為各個平台建立的線程提供一個入口點。這個入口點就是PlatformThread::Delegate類的成員函數ThreadMain。由于Thread類重寫了父類PlatformThread::Delegate的成員函數ThreadMain,是以無論是哪一個平台,當它建立完成一個線程後,都會以Thread類的成員函數ThreadMain作為線程的入口點。

       Thread類有一個重要的成員變量message_loop_,它指向的是一個MessageLoop對象。這個MessageLoop對象就是用來描述線程的消息循環的。MessageLoop類内部通過成員變量run_loop_指向的一個RunLoop對象和成員變量pump_指向的一個MessagePump對象來描述一個線程的消息循環。

       一個線程在運作的過程中,可以有若幹個消息循環,也就是一個消息循環可以運作在另外一個消息循環裡面。除了最外層的消息循環,其餘的消息的消息循環稱為嵌套消息循環。我們為什麼需要嵌套消息循環呢?這主要是跟模式對話框有關。

       考慮一個情景,我們在一個視窗彈出一個檔案選擇對話框。視窗必須要等到使用者在檔案選擇對話框選擇了檔案之後,才能去做其它事情。視窗是在消息循環過程中打開檔案對話框的,它要等待使用者在檔案選擇對話框中選擇檔案 ,就意味着消息循環被中止了。由于檔案選擇對話框也是通過消息循環來響應使用者輸入的,是以如果打開的它視窗中止了消息循環,就會導緻它無法響應使用者輸入。為了解決這個問題,就要求打開檔案選擇的視窗不能中止消息循環。方法就是該視窗建立一個子消息循環,該子消息循環負責處理檔案選擇對應框的輸入事件,直到使用者選擇了一個檔案為止。

       MessageLoop類的成員變量run_loop_指向的一個RunLoop對象就是用來記錄線程當使用的消息循環的。RunLoop類有三個重要的成員變量:

       1. message_loop_,記錄一個RunLoop對象關聯的MessageLoop對象。

       2. previous_loop_,記錄前一個消息循環,當就是包含目前消息循環的消息循環。

       3. run_depth_,記錄消息循環的嵌套深度。

       MessageLoop類的成員變量pump_指向的一個MessagePump對象是用來進行消息循環的,也就是說,Thread類描述的線程通過MessagePump類進入到消息循環中去。

       Thread類将消息劃分為三類,分别通過以下三個成員變量來描述:

       1. work_queue_,指向一個TaskQueue對象,用來儲存那些需要馬上處理的消息。

       2. delayed_work_queue_,指向一個DelayedTaskQueue,用來儲存那些需要延遲一段時間再處理的消息。

       3. deferred_non_nestable_work_queue_,指向一個TaskQueue對象,用來儲存那些不能夠在嵌套消息循環中處理的消息。

       一個MessagePump對象在進行消息循環時,如果發現消息隊列中有消息,那麼就需要通知關聯的MessageLoop對象進行處理。通知使用的接口就通過MessagePump::Delegate類來描述。

       MessagePump::Delegate類定義了四個成員函數,如下所示:

       1. DoWork,用來通知MessageLoop類處理其成員變量work_queue_儲存的消息。

       2. DoDelayedWork,用來通知MessageLoop類處理其成員變量delayed_work_queue_儲存的消息。

       3. DoIdleWork,用來通知MessageLoop類目前無消息需要處理,MessageLoop類可以利用該間隙做一些Idle Work。

       4. GetQueueingInformation,用來擷取MessageLoop類内部維護的消息隊列的資訊,例如消息隊列的大小,以及下一個延遲消息的處理時間。

       有了前面的基礎知識,接下來我們就可以大概描述Thread類描述的線程的執行過程。

       首先是線程的啟動過程:

       1. 調用Thread類的成員函數Start或者StartWithOptions啟動一個線程,并且以Thread類的成員函數ThreadMain作為入口點。

       2. Thread類的成員函數ThreadMain負責建立消息循環,也就是通過MessageLoop類建立消息循環。

       3. MessageLoop類在建立消息循環的過程中,會通過成員函數Init建立用來一個用來消息循環的MessagePump對象。

       4. 消息循環建立完成之後,調用MessageLoop類的成員函數Run進入消息循環。

       5. MessageLoop類的成員函數Run建立一個RunLoop對象,并且調用它的成員函數Run進入消息循環。注意,該RunLoop對象在建立的過程,會關聯上目前線程使用的消息循環,也就是建立它的MessageLoop對象。

       6. RunLoop類的成員函數Run負責建立好消息循環的嵌套關系,也就是設定好它的成員變量previous_loop_和run_depth_等,然後就會調用其關聯的MessageLoop對象的成員函數RunHandler進入消息循環。

       7. MessageLoop類的成員函數RunHandler調用成員變量pump_描述的一個MessagePump對象的成員函數Run進入消息循環。

       接下來是向線程的消息隊列發送消息的過程。這是通過MessageLoop類的以下四個成員函數向消息隊列發送消息的:

       1. PostTask,發送需要馬上進行處理的并且可以在嵌套消息循環中處理的消息。

       2. PostDelayedTask,發送需要延遲處理的并且可以在嵌套消息循環中處理的消息。

       3. PostNonNestableTask,發送需要馬上進行處理的并且不可以在嵌套消息循環中處理的消息。

       4. PostNonNestableDelayedTask,發送需要延遲處理的并且不可以在嵌套消息循環中處理的消息。

       向線程的消息隊列發送了新的消息之後,需要喚醒線程,這是通過調用MessagePump類的成員函數Schedule進行的。線程被喚醒之後 ,就會分别調用MessageLoop類重寫父類MessagePump::Delegate的兩個成員函數DoWork和DoDelayedWork對消息隊列的消息進行處理。如果沒有消息可以處理,就調用MessageLoop類重寫父類MessagePump::Delegate的成員函數DoIdleWork通知線程進入Idle狀态,這時候線程就可以做一些Idle Work。

       MessageLoop類的成員函數DoWork在處理消息的過程中,按照以下三個類别進行處理:

       1. 對于可以馬上處理的消息,即儲存在成員變量work_queue_描述的消息隊列的消息,執行它們的成員函數Run。

       2. 對于需要延遲處理的消息,将它們儲存在成員變量delayed_work_queue_描述的消息隊列中,并且調用成員變量pump_指向的一個MessagePump對象的成員函數ScheduleDelayedWork設定最早一個需要處理的延遲消息的處理時間,以便該MessagePump對象可以優化消息循環邏輯。

       3. 對于可以馬上處理但是不可以在嵌套消息循環中處理的消息,如果線程是處理嵌套消息循環中,那麼将它們儲存在成員變量deferred_non_nestable_work_queue_描述的消息隊列中,這些消息将會線上程進入Idle狀态時,并且是處理最外層消息循環時,得到處理。

       以上就是Thread類描述的線程的大概執行過程,接下來我們通過源碼分析較長的描述這些過程。

       我們首先看線程的啟動過程,即Thread類的成員函數Start的實作,如下所示:

bool Thread::Start() {
  Options options;
  ......
  return StartWithOptions(options);
}
           

       這個函數定義在檔案external/chromium_org/base/threading/thread.cc中。

       Thread類的成員函數Start調用另外一個成員函數StartWithOptions來啟動一個線程,後者可以通過一個類型為Options的參數指定線程的啟動參數,這裡沒有指定,意味着采用預設參數啟動一個線程。

       Thread類的成員函數StartWithOptions的實作如下所示:

bool Thread::StartWithOptions(const Options& options) {
  ......

  StartupData startup_data(options);
  startup_data_ = &startup_data;

  if (!PlatformThread::Create(options.stack_size, this, &thread_)) {
    ......
    return false;
  }

  // Wait for the thread to start and initialize message_loop_
  base::ThreadRestrictions::ScopedAllowWait allow_wait;
  startup_data.event.Wait();

  // set it to NULL so we don't keep a pointer to some object on the stack.
  startup_data_ = NULL;
  started_ = true;

  ......
  return true;
}
           

       這個函數定義在檔案external/chromium_org/base/threading/thread.cc中。

       Thread類的成員函數StartWithOptions首先是将線程啟動參數封裝一個在棧上配置設定的StartupData對象中,并且這個StartupData對象的位址會儲存在Thread類的成員變量startup_data_中。接下來再調用由平台實作的PlatformThread類的靜态成員函數Create建立一個線程。最後通過上面封裝的StartupData對象的成員變量event描述的一個WaitableEvent對象等待上述建立的線程啟動完成。

       一般情況下,線程是不可以進入等待狀态的,因為這樣會降低線程的響應性。但是有時候線程不得不進入等待狀态,例如現在這個情況,目前線程必須要等新建立的線程啟動完成之後才能傳回,否則的話有可能新建立的線程還沒有啟動完成,前面在棧上配置設定的StartupData對象就已經被釋放,這樣會導緻新建立的線程無法通路它的啟動參數。

       當新建立的線程啟動完成之後,就會通過上述的WaitableEvent對象喚醒目前線程,目前線程将Thread類的成員變量startup_data_置為NULL,避免它引用一個即将無效的在棧上配置設定的StartupData對象,并且将Thread類的成員變量started_的值設定為true,表示新建立的線程已經啟動完畢。

       接下來我們繼續分析PlatformThread類的靜态成員函數Create的實作。以Android平台為例,它的實作如下所示:

bool PlatformThread::Create(size_t stack_size, Delegate* delegate,
                            PlatformThreadHandle* thread_handle) {
  base::ThreadRestrictions::ScopedAllowWait allow_wait;
  return CreateThread(stack_size, true /* joinable thread */,
                      delegate, thread_handle, kThreadPriority_Normal);
}
           

      這個函數定義在檔案external/chromium_org/base/threading/platform_thread_posix.cc中。

      PlatformThread類的靜态成員函數Create調用了另外一個函數CreateThread來建立一個線程,後者的實作如下所示:

bool CreateThread(size_t stack_size, bool joinable,
                  PlatformThread::Delegate* delegate,
                  PlatformThreadHandle* thread_handle,
                  ThreadPriority priority) {
  ......

  bool success = false;
  pthread_attr_t attributes;
  pthread_attr_init(&attributes);

  // Pthreads are joinable by default, so only specify the detached
  // attribute if the thread should be non-joinable.
  if (!joinable) {
    pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_DETACHED);
  }

  // Get a better default if available.
  if (stack_size == 0)
    stack_size = base::GetDefaultThreadStackSize(attributes);

  if (stack_size > 0)
    pthread_attr_setstacksize(&attributes, stack_size);

  ThreadParams params;
  params.delegate = delegate;
  params.joinable = joinable;
  params.priority = priority;
  params.handle = thread_handle;

  pthread_t handle;
  int err = pthread_create(&handle,
                           &attributes,
                           ThreadFunc,
                           &ms);
  success = !err;
  ......

  pthread_attr_destroy(&attributes);

  .....

  return success;
}
           

       這個函數定義在檔案external/chromium_org/base/threading/platform_thread_posix.cc中。

       從這裡就可以看到,Android平台調用POSIX線程庫中的函數pthread_create建立了一個線程,并且指定新建立的線程的入口點函數為ThreadFunc,同時傳遞給該入口點函數的參數為一個ThreadParams對象,該ThreadParams對象封裝了線程啟動過程中需要使用到的一系列參數。    

       新建立線程的入口點函數ThreadFunc的實作如下所示:

void* ThreadFunc(void* params) {
  ......
  ThreadParams* thread_params = static_cast<ThreadParams*>(params);

  PlatformThread::Delegate* delegate = thread_params->delegate;
  ......

  delegate->ThreadMain();

  ......
  return NULL;
}
           

       這個函數定義在檔案external/chromium_org/base/threading/platform_thread_posix.cc中。

       函數ThreadFunc首先将參數params轉換為一個ThreadParams對象。有了這個ThreadParams對象之後,就可以通過它的成員變量delegate獲得一個PlatformThread::Delegate對象。從前面的調用過程可以知道,這個PlatformThread::Delegate對象實際上是一個Thread對象,用來描述新建立的線程。得到了用來描述新建立線程的Thread對象之後,就可以調用它的成員函數ThreadMain繼續啟動線程了。

       Thread類的成員函數ThreadMain的實作如下所示:

void Thread::ThreadMain() {
  {
    ......
    scoped_ptr<MessageLoop> message_loop;
    if (!startup_data_->options.message_pump_factory.is_null()) {
      message_loop.reset(
          new MessageLoop(startup_data_->options.message_pump_factory.Run()));
    } else {
      message_loop.reset(
          new MessageLoop(startup_data_->options.message_loop_type));
    }

    ......

    message_loop_ = message_loop.get();

    Init();

    running_ = true;
    startup_data_->event.Signal();

    ......

    Run(message_loop_);
    running_ = false;

    ......

    message_loop_ = NULL;
  }
}
           

       這個函數定義在檔案external/chromium_org/base/threading/thread.cc中。

       回憶前面分析的Thread類的成員函數StartWithOptions,它已經将用來描述線程啟動參數的一個Options對象儲存在成員變量startup_data_描述的一個StartupData對象中,是以我們就可以重新擷取這個Options對象。

       當Options類的成員變量message_pump_factory不等于NULL時,就表示新建立線程使用的Message Pump通過該成員變量描述的一個Callback對象來建立,也就是調用該Callback對象的成員函數Run來建立。關于Chromium的Callback機制,可以參考前面Chromium多線程通信的Closure機制分析一文。有了Message Pump之後,就可以建立一個Message Loop了。該Message Loop最終會儲存在Thread類的成員變量message_loop_中。

       一般我們不通過Options類的成員變量message_pump_factory來建立Message Pump,而是通過另外一個成員變量message_loop_type來建立指定Message Loop的類型 ,進而确定要建立的Message Pump,這些邏輯都封裝在MessageLoop類的構造函數中。

       建立好Message Loop之後,線程的啟動工作就完成了,接下來新建立的線程就需要進入到初始化狀态,這是通過調用Thread類的成員函數Init實作的。Thread類的成員函數Init一般由子類重寫,這樣子類就有機會執行一些線程初始化工作。

       再接下來,新建立的線程就需要進入運作狀态,這是通過調用Thread類的成員函數Run實作的。不過在新建立線程進入運作狀态之前,還會做兩件事情。第一件事情是将Thread類的成員變量running_設定為true,表示新建立的線程正在運作。第二件事情是通過Thread類的成員變量startup_data_指向的一個StartupData對象的成員變量event描述的一個WaitableEvent喚醒請求建立新線程的線程。

       最後,當Thread類的成員函數Run執行完成傳回後,需要将Thread類的成員變量running_和message_loop_分别重置為false和NULL,表示新建立的線程已經運作結束了,是以就不再需要Message Loop了。

       接下來我們首先分析線程的Message Loop的建立過程,也就是MessageLoop類的構造函數的實作,以完成線程的啟動過程,然後再分析線程的運作過程,也就是Thread類的成員函數Run的實作。

       我們假設線程的Message Loop是通過Message Loop Type來建立的,對應的MessageLoop類構造函數的實作如下所示:

MessageLoop::MessageLoop(Type type)
    : type_(type),
      nestable_tasks_allowed_(true),
      ......
      run_loop_(NULL) {
  Init();

  pump_ = CreateMessagePumpForType(type).Pass();
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_loop.cc。 

       MessageLoop類的成員變量type_描述的是消息循環的類型,nestable_tasks_allowed_描述目前是否允許處理嵌套消息,runn_loop_描述的是目前使用的消息循環。 

       MessageLoop類構造函數首先是調用成員函數Init執行初始化工作,接着再調用成員函數CreateMessagePumpForType根據消息循環的類型建立一個Message Pump。接下來我們就分别分析這兩個成員函數的實作。

       MessageLoop類的成員函數Init的實作如下所示:

LazyInstance<base::ThreadLocalPointer<MessageLoop> >::Leaky lazy_tls_ptr =
    LAZY_INSTANCE_INITIALIZER;

......

void MessageLoop::Init() {
  ......
  lazy_tls_ptr.Pointer()->Set(this);

  incoming_task_queue_ = new internal::IncomingTaskQueue(this);
  message_loop_proxy_ =
      new internal::MessageLoopProxyImpl(incoming_task_queue_);
  thread_task_runner_handle_.reset(
      new ThreadTaskRunnerHandle(message_loop_proxy_));
}
           

      這個函數定義在檔案external/chromium_org/base/message_loop/message_loop.cc中。

      MessageLoop類的成員函數Init首先将目前建立的MessageLoop對象儲存在全局變量lazy_tls_ptr指向一塊線程局部存儲中,這樣我們就可以通過MessageLoop類的靜态成員函數current獲得目前線程的消息循環,如下所示:

MessageLoop* MessageLoop::current() {
  ......
  return lazy_tls_ptr.Pointer()->Get();
}
           

      這個函數定義在檔案external/chromium_org/base/message_loop/message_loop.cc中。

      回到MessageLoop類的成員函數Init中,接下來它建立了一個任務隊列,并且儲存在成員變量incoming_queue_中。這個任務隊列通過IncomingQueue類來描述,它的定義如下所示:

class BASE_EXPORT IncomingTaskQueue
    : public RefCountedThreadSafe<IncomingTaskQueue> {
 public:
  ......

  bool AddToIncomingQueue(const tracked_objects::Location& from_here,
                          const Closure& task,
                          TimeDelta delay,
                          bool nestable);

  ......
 
  void ReloadWorkQueue(TaskQueue* work_queue);

  ......

  void WillDestroyCurrentMessageLoop();
 
  ......

 private:
  ......

  TaskQueue incoming_queue_;

  ......

  MessageLoop* message_loop_;

  ......
};
           

       這個類定義在external/chromium_org/base/message_loop/incoming_task_queue.h中。

       IncomingQueue類有兩個重要的成員變量:

       1. incoming_queue_,它描述的是一個TaskQueue,代表的是線程的消息隊列,也就是所有發送給線程的消息都儲存在這裡。

       2. message_loop_,它指向一個MessageLoop對象,描述的是線程的消息循環。

       IncomingQueue類有三個重要的成員函數:

       1. AddToIncomingQueue,用來向成員變量incoming_queue_描述的消息隊列發送一個消息,并且喚醒線程進行處理。

       2. ReloadWorkQueue,用來提取成員變量incoming_queue_描述的消息隊列中的消息,并且儲存在參數work_queue中。

       3. WillDestroyCurrentMessageLoop,當該函數被調用時,會将成員變量message_loop_的值設定為NULL,使得我們不能夠再向線程發送消息,也就是請求線程執行某一個操作。

       IncomingQueue類的上述成員變量和成員函數我們後面分析消息的發送和處理再詳細分析。現在傳回到MessageLoop類的成員函數Init中,它接下來建立了一個MessageLoopProxyImpl對象和一個ThreadTaskRunnerHandle對象,分别儲存在成員變量message_loop_proxy_和thread_task_runner_handle中,前者封裝了目前線程的消息隊列,後者又封裝了前者。它們與MessageLoop類一樣,都是可以用來向線程的消息隊列發送消息,這意味着我們有三種方式向線程的消息隊列發送消息,後面分析消息的發送過程時我們再詳細分析。

       MessageLoop類的成員函數Init執行完成後,回到MessageLoop類的構造函數中,接下來它調用另外一個成員函數CreateMessagePumpForType根據消息循環的類型建立一個消息泵(Message Pump),并且儲存在成員變量pump_中。

       MessageLoop類的成員函數CreateMessagePumpForType的實作如下所示:

#if defined(OS_IOS)
typedef MessagePumpIOSForIO MessagePumpForIO;
#elif defined(OS_NACL)
typedef MessagePumpDefault MessagePumpForIO;
#elif defined(OS_POSIX)
typedef MessagePumpLibevent MessagePumpForIO;
#endif
......

scoped_ptr<MessagePump> MessageLoop::CreateMessagePumpForType(Type type) {
  ......

#if defined(OS_IOS) || defined(OS_MACOSX)
#define MESSAGE_PUMP_UI scoped_ptr<MessagePump>(MessagePumpMac::Create())
#elif defined(OS_NACL)
// Currently NaCl doesn't have a UI MessageLoop.
// TODO(abarth): Figure out if we need this.
#define MESSAGE_PUMP_UI scoped_ptr<MessagePump>()
#else
#define MESSAGE_PUMP_UI scoped_ptr<MessagePump>(new MessagePumpForUI())
#endif

  if (type == MessageLoop::TYPE_UI) {
    if (message_pump_for_ui_factory_)
      return message_pump_for_ui_factory_();
    return MESSAGE_PUMP_UI;
  }
  if (type == MessageLoop::TYPE_IO)
    return scoped_ptr<MessagePump>(new MessagePumpForIO());

#if defined(OS_ANDROID)
  if (type == MessageLoop::TYPE_JAVA)
    return scoped_ptr<MessagePump>(new MessagePumpForUI());
#endif

  ......
  return scoped_ptr<MessagePump>(new MessagePumpDefault());
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_loop.cc中。

       上面的代碼通過一系列宏來适配不同的平台,這裡我們隻考慮Android平台,這意味着MessagePumpForIO定義為MessagePumpLibevent,MESSAGE_PUMP_UI定義為scoped_ptr<MessagePump>(new MessagePumpForUI())。

       從MessageLoop類的成員函數CreateMessagePumpForType的實作可以知道:

       1. 如果消息循環的類型為MessageLoop::TYPE_UI,那麼對應的消息泵為MessagePumpForUI,或者由函數指針message_pump_for_ui_factory_指向的函數建立。但是一般不設定函數指針message_pump_for_ui_factory_,是以,類型為MessageLoop::TYPE_UI的消息循環對應的消息泵為MessagePumpForUI。在Chromium中,消息循環類型為MessageLoop::TYPE_UI的線程稱為UI線程,也就是應用程式的主線程。

       2. 如果消息循環的類型為MessageLoop::TYPE_IO,那麼對應的消息泵為MessagePumpForIO,即MessagePumpLibevent。在Chromium中,消息循環類型為MessageLoop::TYPE_IO的線程稱為IO線程,但是這裡的IO不是讀寫檔案的意思,而是執行IPC的意思。

       3. 如果消息循環的類型為MessageLoop::TYPE_JAVA,那麼對應的消息泵為MessagePumpForUI。在Chromium中,消息循環類型為MessageLoop::TYPE_JAVA的線程稱為JAVA線程,它們與UI線程一樣,在JAVA層具有自己的消息循環。

       4. 其餘類型的消息循環,對應的消息泵為MessagePumpDefault。

       總結來說,就是在Android平台上,涉及到的消息泵有MessagePumpForUI、MessagePumpForIO和MessagePumpDefault三種,各自有不同的用途,其中MessagePumpForUI适用于在Java層具有自己的消息循環的UI線程和Java線程,MessagePumpLibevent适用于用來負責執行IPC的IO線程,MessagePumpDefault适用于其它的一般線程。我們先從一般性出發,分析MessagePumpDefault的實作,後面再分析MessagePumpForUI和MessagePumpForIO的實作。

       MessagePumpDefault類繼承于MessagePump類,它的定義如下所示:

class MessagePumpDefault : public MessagePump {
 public:
  MessagePumpDefault();
  virtual ~MessagePumpDefault();

  // MessagePump methods:
  virtual void Run(Delegate* delegate) OVERRIDE;
  virtual void Quit() OVERRIDE;
  virtual void ScheduleWork() OVERRIDE;
  virtual void ScheduleDelayedWork(const TimeTicks& delayed_work_time) OVERRIDE;

 private:
  // This flag is set to false when Run should return.
  bool keep_running_;

  // Used to sleep until there is more work to do.
  WaitableEvent event_;

  // The time at which we should call DoDelayedWork.
  TimeTicks delayed_work_time_;

  DISALLOW_COPY_AND_ASSIGN(MessagePumpDefault);
};
           

       這個類定義在檔案external/chromium_org/base/message_loop/message_pump_default.h 。

       MessagePumpDefault類重寫了父類MessagePump的成員函數Run、Quit、ScheduleWork和ScheduleDelayedWork,後面我們分析消息循環的執行過程和消息的發送過程時,會看到它們的具體實作。

       MessagePumpDefault類具有三個成員變量:

       1. keep_running_,類型為bool,表示消息循環是否需要繼續執行。隻要線程不退出,消息循環就要持續執行。

       2. event_,類型為WaitableEvent,表示一個可以進行Wait/Wake的事件變量。當線程的消息隊列為空時,線程就通過它進入到Wait狀态,而當向線程的消息隊列發送了一個消息時,就通過它喚醒線程。

       3. delayed_work_time_,類型為TimeTicks,表示線程進入Wait狀态的逾時時間。達到逾時時間之後,線程就會自動喚醒,然後處理那些延遲消息。

       這樣,一個消息循環及其對應的消息泵就建立完畢,回到Thread類的成員函數ThreadMain中,接下來它調用成員函數Run使得線程進入到運作狀态,也就是圍繞消息隊列進行不斷的循環,直到線程退出為止。

       Thread類的成員函數Run的實作如下所示:

void Thread::Run(MessageLoop* message_loop) {
  message_loop->Run();
}
           

       這個函數定義在檔案external/chromium_org/base/threading/thread.cc中。

       Thread類的成員函數Run調用參數message_loop指向的一個MessageLoop對象的成員函數Run使得線程進入運作狀态。

       MessageLoop類的成員函數Run的實作如下所示:

void MessageLoop::Run() {
  RunLoop run_loop;
  run_loop.Run();
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_loop.cc中。

       MessageLoop類的成員函數Run在棧上建立了一個RunLoop對象,然後通過調用該RunLoop對象的成員函數Run使得線程進入運作狀态。

       前面提到,RunLoop的作用是用來建立消息循環的層次關系的,主要是通過它的兩個成員變量previous_run_loop_和run_depth_來實作,此外,它還有一個成員變量loop_,用來關聯它所對應的消息循環。

       RunLoop類的上述三個成員變量的定義如下所示:

class BASE_EXPORT RunLoop {
public:
  ......

 private:
  ......

  MessageLoop* loop_;

  // Parent RunLoop or NULL if this is the top-most RunLoop.
  RunLoop* previous_run_loop_;

  ......

  // Used to count how many nested Run() invocations are on the stack.
  int run_depth_;

  ......
};
           

      這三個成員變量定義在檔案external/chromium_org/base/run_loop.h中。

      它們在RunLoop類的構造函數被初始化,如下所示:

RunLoop::RunLoop()
    : loop_(MessageLoop::current()),
      previous_run_loop_(NULL),
      run_depth_(0),
      ...... {
  ......
}
           

       這個函數定義在檔案external/chromium_org/base/run_loop.cc中。

       從這裡我們就可以看到,一個RunLoop關聯的消息循環就是目前線程使用的消息循環。這個消息循環可以通過調用前面提到的MessageLoop類的靜态成員函數current獲得。

       RunLoop類的成員變量previous_run_loop_和run_depth_分别被初始化為NULL和0,表示還沒有建立好層次關系,但是當RunLoop類的成員函數Run被調用時,它們就會被設定,進而形成層次關系。

       從前面的調用過程可以知道,RunLoop類的成員函數Run在MessageLoop類的成員函數Run中調用,它的實作如下所示:

void RunLoop::Run() {
  if (!BeforeRun())
    return;
  loop_->RunHandler();
  AfterRun();
}
           

       這個函數定義在檔案external/chromium_org/base/run_loop.cc中。

       在調用成員變量loop_指向的一個MessageLoop對象的成員函數RunHandler進入消息循環前後,RunLoop類的成員函數Run分别調用了BeforeRun和AfterRun兩個成員函數,目的就是為了建立好消息循環的層次關系,它們的實作如下所示:

bool RunLoop::BeforeRun() {
  DCHECK(!run_called_);
  run_called_ = true;

  // Allow Quit to be called before Run.
  if (quit_called_)
    return false;

  // Push RunLoop stack:
  previous_run_loop_ = loop_->run_loop_;
  run_depth_ = previous_run_loop_? previous_run_loop_->run_depth_ + 1 : 1;
  loop_->run_loop_ = this;

  running_ = true;
  return true;
}

void RunLoop::AfterRun() {
  running_ = false;

  // Pop RunLoop stack:
  loop_->run_loop_ = previous_run_loop_;

  // Execute deferred QuitNow, if any:
  if (previous_run_loop_ && previous_run_loop_->quit_called_)
    loop_->QuitNow();
}
           

       這兩個函數定義在檔案external/chromium_org/base/run_loop.cc中。

       MessageLoop類的成員變量run_loop_記錄的是消息循環目前使用的Run Loop,是以,RunLoop類的成員函數BeforeRun會将目前正在處理的RunLoop對象記錄在其成員變量loop_指向的一個MessageLoop對象的成員變量run_loop_中,而該MessageLoop對象的成員變量run_loop_原來指向的RunLoop對象則記錄在目前正在處理的RunLoop對象的成員變量previous_run_loop_中,進而就形成一個Run Loop調用棧。此外,第一個Run Loop的Run Depth被設定為1,後面的Run Loop的Run Depth依次增加1。

       從上面的分析就可以看出,RunLoop類的成員函數BeforeRun執行的是一個Run Loop入棧操作,相應地,RunLoop類的成員函數AfterRun執行的是一個Run Loop出棧操作,它将消息循環目前使用的Run Loop恢複為前一個Run Loop。

       RunLoop類的成員變量running_描述的是一個Run Loop目前是否正在被消息循環使用,是以,在RunLoop類的成員函數BeforeRun和AfterRun中,它的值分别被設定為true和false。

       RunLoop類的成員變量quit_called_描述的是一個Run Loop是否收到退出請求。如果一個Run Loop目前正在消息循環使用,并且又收到了退出請求,那麼就将會導緻消息循環退出。這樣就會導緻以下兩種情況:

       1. 一個Run Loop在即将被消息循環使用之前,就已經收到了退出請求,那麼就不會被消息循環使用,表現就為在RunLoop類的成員函數BeforeRun中,如果目前正在處理的RunLoop對象的成員變量quit_called_的值等于true,那麼就傳回一個false值給調用者,表示目前正在處理的RunLoop對象不能夠進入消息循環。

       2. 一個Run Loop在被消息循環使用期間,前一個Run Loop收到了退出請求,那麼目前Run Loop結束使用之後,禁止傳回到前一個Run Loop。這意味着要結束消息循環,表現就為在RunLoop類的成員函數AfterRun中,如果發現目前正在處理的RunLoop對象的成員變量previous_run_loop_不為NULL,并且它指向的一個RunLoop對象的成員變量quit_called的值被設定為true,那麼就會調用目前正在處理的RunLoop對象的成員變量loop_指向的一個MessageLoop對象的成員函數QuitNow退出消息循環。

       回到RunLoop類的成員函數Run中,在調用成員函數BeforeRun成功建立好消息循環的層次關系之後,就通過目前正在處理的RunLoop對象進入到下一層消息循環中,這是通過調用目前正在處理的RunLoop對象的成員變量loop_指向的一個MessageLoop對象的成員函數RunHandler實作的。從前面的分析可以知道,該MessageLoop對象描述的是就是目前線程使用的消息循環。

       MessageLoop類的成員函數RunHandler的實作如下所示:

void MessageLoop::RunHandler() {
  ......

  pump_->Run(this);
}
           

      這個函數定義在檔案external/chromium_org/base/message_loop/message_loop.cc中。

      MessageLoop類的成員函數RunHandler通過調用成員變量pump_指向的一個MessagePump對象的成員函數Run進入消息循環。前面我們假設該MessagePump對象是一個MessagePumpDefault對象,是以接下來我們繼續分析MessagePumpDefault類的成員函數Run的實作,如下所示:

void MessagePumpDefault::Run(Delegate* delegate) {
  ......

  for (;;) {
    ......

    bool did_work = delegate->DoWork();
    if (!keep_running_)
      break;

    did_work |= delegate->DoDelayedWork(&delayed_work_time_);
    if (!keep_running_)
      break;

    if (did_work)
      continue;

    did_work = delegate->DoIdleWork();
    if (!keep_running_)
      break;

    if (did_work)
      continue;

    ThreadRestrictions::ScopedAllowWait allow_wait;
    if (delayed_work_time_.is_null()) {
      event_.Wait();
    } else {
      TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
      if (delay > TimeDelta()) {
        event_.TimedWait(delay);
      } else {
        // It looks like delayed_work_time_ indicates a time in the past, so we
        // need to call DoDelayedWork now.
        delayed_work_time_ = TimeTicks();
      }
    }
    // Since event_ is auto-reset, we don't need to do anything special here
    // other than service each delegate method.
  }

  keep_running_ = true;
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_pump_default.cc中。

       參數delegate是一個Delegate類型的指針,但是從上面的調用過程可以知道,它指向的是一個MessageLoop對象。

       MessagePumpDefault類的成員函數Run在一個for循環中不斷地通過調用參數delegate指向的一個MessageLoop對象的成員函數DoWork和DoDelayedWork檢查線程的消息隊列是否有任務需要處理。如果沒有,再調用該MessageLoop對象的成員函數DoIdleWork處理一些适用線上程空閑時進行的任務。

       MessageLoop類的成員函數DoWork、DoDelayedWork和DoIdleWork的傳回值均為一個布爾值。當這個布爾值等于true的時候,就表示線程處理了一些任務。在這種情況下,就需要重新執行一遍for循環,這是因為上述三個函數在處理任務的過程中,可能又往線程的消息隊列發送了新的任務,是以需要for循環檢查進行檢查,以及新發送的任務能夠得到及時處理。

       另一方面,如果MessageLoop類的成員函數DoWork、DoDelayedWork和DoIdleWork的傳回值均為false,那就表示線程目前實在是無事可做。這時候就不适合重新執行一遍for循環,是以這會使得線程在空轉。在這種情況下,最好的方式就是讓線程進入睡眠狀态,以便将CPU釋放出來。那麼線程什麼時候需要喚醒呢?

       在兩種情況下,線程需要從睡眠狀态喚醒過來。第一種情況是線程的消息隊列有新的消息加入的時候,這時候由發送消息的線程進行喚醒。第二種情況是,線程有一個延時消息需要處理,那麼當系統達到該消息的處理時間時,線程就需要自動喚醒過來。

       如果線程有一個延時消息需要處理,那麼MessagePumpDefault類的成員變量delayed_work_time_就表示該消息在将來執行的時間點。注意,如果線程具有多個延時消息,那麼MessagePumpDefault類的成員變量delayed_work_time_描述的是最早的延時點,這時候線程最多就隻能睡眠到該時間點,然後自動喚醒過來。還有一點需要注意的是,如果最早的延時點小于系統的目前時間,那麼線程就不可以睡眠,而要馬上重新執行for循環,以便可以對已經超過了時間點處理的消息進行處理。如果線程沒有延時消息需要處理,那麼線程就不會設定自動喚醒時間,而是一直處理睡眠狀态,直到被其它線程喚醒為止。

       無論線程是通過哪一種情況下進行睡眠狀态,都是通過MessagePumpDefault類的成員變量event_描述的一個WaitableEvent對象進行,即通過調用它的成員函數Wait和TimedWait進行的。

       WaitableEvent是有效地實作線程消息循環的一個重要類。通過WaitableEvent類,線程可以在無消息處理時進入睡眠狀态,并且在有消息處理時從睡眠狀态喚醒過來,進而避免了不斷地輪循消息隊列是否有消息處理的操作。因為消息隊列可能在大多數情況下都是空的,對它進行不斷輪循将會浪費CPU周期。

       因為WaitableEvent類是如此重要,是以接下來我們先分析它的實作,然後再繼續分析線程處理消息的過程,也就是MessageLoop類的成員函數DoWork、DoDelayedWork和DoIdleWork的實作。

       WaitableEvent類的定義如下所示:

class BASE_EXPORT WaitableEvent {
 public:
  ......
  WaitableEvent(bool manual_reset, bool initially_signaled);
  ......

  void Signal();
  ......
  void Wait();
  ......
  bool TimedWait(const TimeDelta& max_time);

  ......

  class Waiter {
   public:
    ......
    virtual bool Fire(WaitableEvent* signaling_event) = 0;
    ......
    virtual bool Compare(void* tag) = 0;
    ......
  };

  ......

  struct WaitableEventKernel :
      public RefCountedThreadSafe<WaitableEventKernel> {
   public:
    WaitableEventKernel(bool manual_reset, bool initially_signaled);

    bool Dequeue(Waiter* waiter, void* tag);

    base::Lock lock_;
    const bool manual_reset_;
    bool signaled_;
    std::list<Waiter*> waiters_;

    ......
  };

  ......

  bool SignalAll();
  bool SignalOne();
  void Enqueue(Waiter* waiter);

  scoped_refptr<WaitableEventKernel> kernel_;
  ......
};
           

       這個類定義在檔案external/chromium_org/base/synchronization/waitable_event.h中。

       這裡我們隻讨論Android平台相關的實作。WaitableEvent類提供兩個最基本的功能:Wait和Signal。Wait操作使得線程進入睡眠狀态,而Signal操作使得線程從睡眠狀态喚醒過來。

       在WaitableEvent類中,Wait操作對應的兩個成員函數為Wait和TimedWait。前者使得線程一直處理喚醒狀态,直到被其它線程喚醒為止,而後者使得線程進入到睡眠狀态的時間為有限時間,并且在超過該時間後,線程自動喚醒。

       在WaitableEvent類中,Signal操作對應的成員函數為Signal,内部通成員函數SignalAll和SignalOne實作。前者喚醒所有的等待者,而後者隻喚醒其中一個等待者。

       等待者通過内部類Waiter描述,它有Fire和Compare兩個成員函數。一個Waiter需要喚醒時,它的成員函數Fire就會被調用。Waiter類的成員函數Compare用來比較一個Waiter與另外一個Waiter是否相同。

       一個WaitableEvent可以有若幹個Waiter,這些Waiter通過WaitableEvent類的成員函數Enqueue加入到成員變量kernel_指向的一個WaitableEventKernel對象的成員變量waiters_描述的一個清單中。

       WaitableEventKernel類除了上述的成員變量waiters_之外,還具有以下三個成員變量:

       1. lock_,一個互斥鎖,用來保護成員變量waiters_的并發通路。

       2. manual_reset_,一個布爾變量,用來表示一個WaitableEvent被喚醒的時候,是否需要手動設定才變為Signaled狀态。

       3. signaled_,一個布爾變量,用來表示一個WaitableEvent是否處于Signaled狀态。

      上述三個成員變量以及成員變量waiters_都是用來描述一個WaitableEvent的狀态的。為什麼不将這些成員變量直接作為WaitableEvent類的成員變量呢?這是為了模拟Windows系統的HANDLE語意的。在Windows平台,一個描述WaitableEvent對象的HANDLE處理等待狀态時,是可以關閉的,即可以被Close。Windows平台認為這種情況會出現未定義行為,但是不會導緻程式Crash。Android平台的WaitableEvent為具有這樣的語意,就将描述WaitableEvent狀态的成員變量儲存一個WaitableEventKernel對象中,然後通過一個scoped_refptr智能指針kernel_引用它。這樣,當一個WaitableEvent被Close時,它本身是被銷毀了,但是它的成員變量kernel_指向的WaitableEventKernel對象卻未必會被銷毀,這取決于其宿主WaitableEvent的使用情況。例如,如果這個WaitableEventKernel對象同時也被另外一個scoped_refptr智能指針引用時,由于它的引用計數大于1,那麼它就不會被銷毀。這意味着在我們可以有一種方式,使得一個WaitableEvent被銷毀時,我們仍然可以通過其成員變量kernel_描述的WaitableEventKernel對象操作該WaitableEvent,而且可以避免程式Crash。

        我們先看WaitableEvent類的構造函數的實作,如下所示:

WaitableEvent::WaitableEvent(bool manual_reset, bool initially_signaled)
: kernel_(new WaitableEventKernel(manual_reset, initially_signaled)) {
}
           

       這個函數定義在檔案external/chromium_org/base/synchronization/waitable_event_posix.cc中。

       WaitableEvent的構造函數主要就是建立了一個WaitableEventKernel對象,并且儲存在成員變量kernel_中。

       我們接下來繼續分析WaitableEvent類的成員函數Wait的實作,如下所示:

void WaitableEvent::Wait() {
bool result = TimedWait(TimeDelta::FromSeconds(-1));
......
}
           

       這個函數定義在檔案external/chromium_org/base/synchronization/waitable_event_posix.cc中。

       WaitableEvent類的成員函數Wait通過調用另外一個成員函數TimedWait使得線程進入睡眠狀态,并且指定進入睡眠狀态的時間為-1,即無限地進入睡眠狀态,直到被其它線程喚醒為止。

       WaitableEvent類的成員函數TimedWait的實作如下所示:所示:

bool WaitableEvent::TimedWait(const TimeDelta& max_time) {
......
const TimeTicks end_time(TimeTicks::Now() + max_time);
const bool finite_time = max_time.ToInternalValue() >= 0;
kernel_->lock_.Acquire();
if (kernel_->signaled_) {
if (!kernel_->manual_reset_) {
// In this case we were signaled when we had no waiters. Now that
// someone has waited upon us, we can automatically reset.
kernel_->signaled_ = false;
}
kernel_->lock_.Release();
return true;
}
SyncWaiter sw;
sw.lock()->Acquire();
Enqueue(&sw);
kernel_->lock_.Release();
for (;;) {
const TimeTicks current_time(TimeTicks::Now());
if (sw.fired() || (finite_time && current_time >= end_time)) {
const bool return_value = sw.fired();
......
sw.lock()->Release();
kernel_->lock_.Acquire();
kernel_->Dequeue(&sw, &sw);
kernel_->lock_.Release();
return return_value;
}
if (finite_time) {
const TimeDelta max_wait(end_time - current_time);
sw.cv()->TimedWait(max_wait);
} else {
sw.cv()->Wait();
}
}
}
           

       這個函數定義在檔案external/chromium_org/base/synchronization/waitable_event_posix.cc中。

       我們分段來閱讀WaitableEvent類的成員函數TimedWait的代碼。

       第一段代碼如下所示:

const TimeTicks end_time(TimeTicks::Now() + max_time);
const bool finite_time = max_time.ToInternalValue() >= 0;
           

      第一行代碼計算線程進入睡眠狀态的結束時間,儲存在變量end_time中。第二行代碼判斷參數max_time是否大于等于0。如果大于等于0,就意味着參數max_time描述的是一個有限的時間,即線程不能無限進入睡眠狀态。在這種情況下,變量finite_time的值等于true。否則的話,變量finite_time的值等于false。

      第二段代碼如下所示:

kernel_->lock_.Acquire();
if (kernel_->signaled_) {
if (!kernel_->manual_reset_) {
// In this case we were signaled when we had no waiters. Now that
// someone has waited upon us, we can automatically reset.
kernel_->signaled_ = false;
}
kernel_->lock_.Release();
return true;
}
           

       這段代碼判斷目前處理的WaitableEvent是否已經處于Signaled狀态。如果是的話,目前線程就不需要進入睡眠狀态了,因為目前線程本來就是要等待目前處理的WaitableEvent處于Signaled狀态的。在這種情況下,WaitableEvent類的成員函數TimedWait就直接傳回一個true值給調用者,表示已經成功地等待目前處理的WaitableEvent處于Signaled狀态。

      不過在傳回之前,會判斷目前處理的WaitableEvent在建立時是否指定了當它處于Signaled狀态時,可以自動Reset為非Signaled狀态。從這裡我們就可以看出,一個WaitableEvent的狀态可以從Signaled自動Reset為非Signaled,指的就是一個當其在Signaled狀态時被執行Wait操作時,會自動變為非Signaled狀态。這樣在下一次執行Wait操作時,就要等到該WaitableEvent的狀态變為Signaled之後,WaitableEvent類的成員函數TimedWait才會傳回。

      從這裡我們就可以看到,WaitableEvent類的成員函數TimedWait是通過成員變量kernel_指向的一個WaitableEventKernel對象來獲得Signaled狀态的,進而可以避免一個WaitableEvent被銷毀的時候,我們仍然可以通路它的狀态,而不會引發程式Crash。

      從這裡我們還可以看到,通路WaitableEvent的狀态需要在加鎖的情況下進行,該鎖由其成員變量kernel_指向的一個WaitableEventKernel對象的成員變量lock_描述。同時,WaitableEvent類的成員函數TimedWait在傳回之前,需要釋放該鎖。

      第三段代碼如下所示:

SyncWaiter sw;
sw.lock()->Acquire();
Enqueue(&sw);
kernel_->lock_.Release();
           

      這段代碼在棧上建立一個SyncWaiter對象,并且通過調用成員函數Enqueue将其加入到目前正在處理的WaitableEvent的Waiter清單中,如下所示:

void WaitableEvent::Enqueue(Waiter* waiter) {
kernel_->waiters_.push_back(waiter);
}
           

       由于要操作目前正在處理的WaitableEvent的Waiter清單,是以WaitableEvent類的成員函數Enqueue需要在加鎖的情況下進行操作。

       SyncWaiter是一個用來描述同步Waiter的類,所謂同步Waiter,就是說線上程進入睡眠狀态這段時間,它是不會被銷毀的。是以,線上程進入睡眠狀态這段時間裡,我們可以安全地對它進行操作。

       SyncWaiter類的實作如下所示:

class SyncWaiter : public WaitableEvent::Waiter {
public:
SyncWaiter()
: fired_(false),
signaling_event_(NULL),
lock_(),
cv_(&lock_) {
}
virtual bool Fire(WaitableEvent* signaling_event) OVERRIDE {
base::AutoLock locked(lock_);
if (fired_)
return false;
fired_ = true;
signaling_event_ = signaling_event;
cv_.Broadcast();
return true;
}
WaitableEvent* signaling_event() const {
return signaling_event_;
}
virtual bool Compare(void* tag) OVERRIDE {
return this == tag;
}
bool fired() const {
return fired_;
}
void Disable() {
fired_ = true;
}
base::Lock* lock() {
return &lock_;
}
base::ConditionVariable* cv() {
return &cv_;
}
private:
bool fired_;
WaitableEvent* signaling_event_; // The WaitableEvent which woke us
base::Lock lock_;
base::ConditionVariable cv_;
};
           

       這個類定義在檔案external/chromium_org/base/synchronization/waitable_event_posix.cc中。 

       SyncWaiter類的核心是定義了一個互斥鎖lock_和一個條件變量cv_,它們分别可以通過成員函數lock和cv來獲得。

       SyncWaiter類有一個重要的成員函數Fire,它的作用喚醒睡眠在條件變量cv_的線程,并且将成員變量fired_設定為true,用來表示成員變量signaling_event_描述的WaitableEvent已經處于Signaled狀态。

       我們繼續分析WaitableEvent類的成員函數TimedWait的最後一段代碼,如下所示:

for (;;) {
const TimeTicks current_time(TimeTicks::Now());
if (sw.fired() || (finite_time && current_time >= end_time)) {
const bool return_value = sw.fired();
......
sw.lock()->Release();
kernel_->lock_.Acquire();
kernel_->Dequeue(&sw, &sw);
kernel_->lock_.Release();
return return_value;
}
if (finite_time) {
const TimeDelta max_wait(end_time - current_time);
sw.cv()->TimedWait(max_wait);
} else {
sw.cv()->Wait();
}
}
           

      這個for循環不斷檢查剛才已經加入到目前正在處理的WaitableEvent的Waiter清單的等待者sw是否已經被Fired。如果已經被Fired,那麼就說明目前正在處理的WaitableEvent已經處理Signaled狀态,是以就可以結束檢查,并且傳回了。不過在傳回之前,會調用WaitableEventKernel類的成員函數Dequeue将等待者sw從目前正在處理的WaitableEvent的Waiter清單删除。

       WaitableEventKernel類的成員函數Dequeue的實作如下所示:

bool WaitableEvent::WaitableEventKernel::Dequeue(Waiter* waiter, void* tag) {
for (std::list<Waiter*>::iterator
i = waiters_.begin(); i != waiters_.end(); ++i) {
if (*i == waiter && (*i)->Compare(tag)) {
waiters_.erase(i);
return true;
}
}
return false;
}
           

       這個函數定義在檔案external/chromium_org/base/synchronization/waitable_event_posix.cc中。

       從這裡我們可以看到,WaitableEventKernel類的成員函數Dequeue首先是在目前正在處理的WaitableEvent的Waiter清單找到參數waiter描述的Waiter,然後再将其從清單中删除。

       在目前正在處理的WaitableEvent的Waiter清單中查找參數waiter描述的Waiter,不僅要對比清單的Waiter與參數waiter描述的Waiter的位址是否相等,還要進一步以參數tag為參數,調用前者的成員函數Compare,隻有當該成員函數傳回值等于true時,才會将參數waiter描述的Waiter從清單中删除。之是以要這樣做,是與後面我們分析的異步Waiter有關的。這一點我們後面再詳細分析。

       回到WaitableEvent類的成員函數TimedWait的最後一段代碼中。雖然等待者sw沒有被Fired,但是WaitableEvent類的成員函數TimedWait的參數max_time指定了目前線程隻可以等待有限的時候,并且這個有限時間已經過去。在這種情況下,即使等待者sw沒有被Fired,那麼WaitableEvent類的成員函數TimedWait也要傳回了,不過這時候它的傳回值為false。

       最後,WaitableEvent類的成員函數TimedWait判斷線程是否隻是有限地進入睡眠狀态,即判斷變量finite_time的值是否等于true。如果等于true,那麼就會通過調用等待者sw内部的條件變量cv_的成員函數TimedWait使得目前線程進入睡眠狀态,并且指定最長的睡眠時間為max_wait。

       另一方面,如果WaitableEvent類的成員函數TimedWait判斷線程需要無限地進入睡眠狀态,那麼就會通過調用等待者sw内部的條件變量cv_的成員函數Timed使得目前線程進入無限睡眠狀态,直到被其它線程喚醒為止。

       我們最後繼續分析WaitableEvent類的成員函數Signal的實作,如下所示:

void WaitableEvent::Signal() {
base::AutoLock locked(kernel_->lock_);
if (kernel_->signaled_)
return;
if (kernel_->manual_reset_) {
SignalAll();
kernel_->signaled_ = true;
} else {
// In the case of auto reset, if no waiters were woken, we remain
// signaled.
if (!SignalOne())
kernel_->signaled_ = true;
}
}
           

       這個函數定義在檔案external/chromium_org/base/synchronization/waitable_event_posix.cc中。

       如果目前正在處理的WaitableEvent已經處于Signaled狀态,那麼WaitableEvent類的成員函數Signal就不需要再次将其修改為Signaled狀态并且喚醒等待者了。否則的話,就繼續往前執行。

       如果目前正在處理的WaitableEvent的Signaled狀态不可自動Reset為非Signaled狀态,那麼就調用成員函數SignalAll喚醒所有的等待者,并且儲存目前正在處理的WaitableEvent的狀态為Signaled狀态。

       如果目前正在處理的WaitableEvent的Signaled狀态可以自動Reset為非Signaled狀态,那麼就調用成員函數SignalOne喚醒其中的一個等待者。但是如果一個等待者都沒有被喚醒,那麼就會繼續保持目前正在處理的WaitableEvent的狀态為Signaled狀态。

       WaitableEvent類的成員函數SignalAll的實作如下所示:

bool WaitableEvent::SignalAll() {
bool signaled_at_least_one = false;
for (std::list<Waiter*>::iterator
i = kernel_->waiters_.begin(); i != kernel_->waiters_.end(); ++i) {
if ((*i)->Fire(this))
signaled_at_least_one = true;
}
kernel_->waiters_.clear();
return signaled_at_least_one;
}
           

       這個函數定義在檔案external/chromium_org/base/synchronization/waitable_event_posix.cc中。

       WaitableEvent類的成員函數SignalAll對目前正在處理的WaitableEvent的Waiter清單中的每一個Waiter,都調用其成員函數Fire,使得它們可以喚醒相應的線程。

       最後,WaitableEvent類的成員函數SignalAll會清空目前正在處理的WaitableEvent的Waiter清單,并且在至少喚醒一個Waiter的情況下,傳回一個true值給調用者,否則就傳回false。

       WaitableEvent類的成員函數SignalOne的實作如下所示:

bool WaitableEvent::SignalOne() {
for (;;) {
if (kernel_->waiters_.empty())
return false;
const bool r = (*kernel_->waiters_.begin())->Fire(this);
kernel_->waiters_.pop_front();
if (r)
return true;
}
}
           

       這個函數定義在檔案external/chromium_org/base/synchronization/waitable_event_posix.cc中。

       WaitableEvent類的成員函數SignalOne從目前正在處理的WaitableEvent的Waiter清單中的第一個Waiter開始,依次調用它們的成員函數Fire。隻要其中的某一個Waiter的成員函數Fire的傳回值為true,那麼就會停止周遊Waiter清單,并且傳回一個true值給調用者。如果沒有一個Waiter的成員函數Fire傳回值為true,那麼WaitableEvent類的成員函數SignalOne的傳回值就為false。注意,每一個被周遊過的Waiter,不管它的成員函數Fire的傳回值是什麼,它都會從Waiter清單删除。

       以上就是WaitableEvent通過同步等待者(SyncWaiter)實作的Wait和Signal操作。WaitableEvent還可以通過異步等待者實作異步的Wait和Signal操作。為了了解這種異步Wait和Signal操作,我們先看一個例子,如下所示:

class MyClass {
public:
void DoStuffWhenSignaled(WaitableEvent *waitable_event) {
watcher_.StartWatching(waitable_event,
base::Bind(&MyClass::OnWaitableEventSignaled, this);
}
private:
void OnWaitableEventSignaled(WaitableEvent* waitable_event) {
// OK, time to do stuff!
}
base::WaitableEventWatcher watcher_;
};
           

       當我們調用MyClass類的成員函數DoStuffWhenSignaled的時候,表示希望在參數waitable_event描述的一個WaitableEvent處于Signaled狀态時,可以調用MyClass類的另外一個成員函數OnWaitableEventSignaled幹點其它事情,這是通過成員變量watcher_描述的一個WaitableEventWatcher對象的成員函數StartWatching實作的。

       也就是說,通過WaitableEventWatcher類,我們可以監控一個WaitableEvent,使得它處于Signaled狀态時,獲得通知。

        WaitableEventWatcher的定義如下所示:

class BASE_EXPORT WaitableEventWatcher
: public MessageLoop::DestructionObserver {
public:
typedef Callback<void(WaitableEvent*)> EventCallback;
WaitableEventWatcher();
virtual ~WaitableEventWatcher();
bool StartWatching(WaitableEvent* event, const EventCallback& callback);
void StopWatching();
......
private:
......
MessageLoop* message_loop_;
scoped_refptr<Flag> cancel_flag_;
AsyncWaiter* waiter_;
base::Closure internal_callback_;
scoped_refptr<WaitableEvent::WaitableEventKernel> kernel_;
WaitableEvent* event_;
EventCallback callback_;
};
           

      這個類定義在檔案external/chromium_org/base/synchronization/waitable_event_watcher.h。

      WaitableEventWatcher類有兩個重要的成員函數StartWatching和StopWatching。前者用來監控一個WaitableEvent,并且當該WaitableEvent狀态變成Signaled時,調用一個EventCallback,它的實作如下所示:

bool WaitableEventWatcher::StartWatching(
WaitableEvent* event,
const EventCallback& callback) {
MessageLoop *const current_ml = MessageLoop::current();
......
cancel_flag_ = new Flag;
callback_ = callback;
internal_callback_ =
base::Bind(&AsyncCallbackHelper, cancel_flag_, callback_, event);
WaitableEvent::WaitableEventKernel* kernel = event->kernel_.get();
AutoLock locked(kernel->lock_);
event_ = event;
if (kernel->signaled_) {
if (!kernel->manual_reset_)
kernel->signaled_ = false;
// No hairpinning - we can't call the delegate directly here. We have to
// enqueue a task on the MessageLoop as normal.
current_ml->PostTask(FROM_HERE, internal_callback_);
return true;
}
message_loop_ = current_ml;
......
kernel_ = kernel;
waiter_ = new AsyncWaiter(current_ml, internal_callback_, cancel_flag_.get());
event->Enqueue(waiter_);
return true;
}
           

       這個函數定義在檔案external/chromium_org/base/synchronization/waitable_event_watcher_posix.cc中。

       WaitableEventWatcher類的成員函數StartWatching首先是獲得目前線程的消息循環,最終會儲存在成員變量message_loop_,這樣當參數event描述的WaitableEvent狀态變為Signaled時,将參數callback描述的一個EventCallback發送到該消息循環去,然後在目前線程中執行。

       接下來,WaitableEventWatcher類的成員函數StartWatching建立了一個Flag對象,并且儲存在成員變量cancel_flag_中。這個Flag對象的作用是用來處理在異步等待WaitableEvent狀态變為Signaled的過程中出現的異常情況的。由于是異步等待,是以就有可能要監控的WaitableEvent的狀态還沒有變為Signaled,進行監控的WaitableEventWatcher就已經被銷毀。

       例如,在前面舉的例子中,我們建立一個MyClass對象,然後調用它的成員函數DoStuffWhenSignaled對一個WaitableEvent進行監控。但是有可能該WaitableEvent的狀态還沒有變為Signaled,前面建立的MyClass對象就被銷毀。這意味着它内部通過成員變量watcher_描述的WaitableEventWatcher對象也會被銷毀。在這種情況下,如果要監控的WaitableEvent狀态變為Signaled,我們必須要保證已經被銷毀的MyClass對象的成員函數OnWaitableEventSignaled不會被調用,否則的話就會出錯了。

       為了能夠正确處理上述的異常情況,就必須要給一個WaitableEventWatcher關聯一個生命周期更長的Flag對象,該Flag對象在要監控的WaitableEvent狀态變為Signaled之前,不會被銷毀。

       Flag類的實作如下所示:

class Flag : public RefCountedThreadSafe<Flag> {
public:
Flag() { flag_ = false; }
void Set() {
AutoLock locked(lock_);
flag_ = true;
}
bool value() const {
AutoLock locked(lock_);
return flag_;
}
private:
friend class RefCountedThreadSafe<Flag>;
~Flag() {}
mutable Lock lock_;
bool flag_;
DISALLOW_COPY_AND_ASSIGN(Flag);
};
           

       這個類定義在檔案external/chromium_org/base/synchronization/waitable_event_watcher_posix.cc中。

       當一個WaitableEventWatcher被銷毀時,與它關聯的Flag對象的成員變量flag_的值就會被設定true,這意味着當監控的WatiableEvent狀态變為Signaled時,不需要執行之前指定的一個EventCallback。

       回到WaitableEventWatcher類的成員函數StartWatching中,接下來它建立一個Closure對象,并且儲存在成員變量internal_callback_中,該Closure對象綁定的函數為AsyncCallbackHelper,并且當它被調用時,傳遞給它的參數前面建立的Flag對象,以及參數callback和event描述的EventCallback對象和WaitableEvent對象。這樣我們就可以推斷出,當參數event描述的WaitableEvent對象狀态變為Signaled時,會通過函數AsyncCallbackHelper來間接地執行參數event描述的EventCallback對象。

       WaitableEventWatcher類的成員函數StartWatching接下來獲得要監控的WaitableEvent内部的一個WaitableEventKernel對象,然後通過該WaitableEventKernel對象判斷要監控的WaitableEvent的狀态是否已經是Signaled。如果是的話,那麼就需要等待了,直接将前面建立的Closure發送到目前線程的消息循環去等待執行即可。

       最後,如果要監控的WaitableEvent的狀态還沒有變為Signaled,那麼就需要進行等待了。這是通過建立一個類型為AsyncWaiter的異步等待者,并且将它加入到要監控的WaitableEvent的Waiter清單中去實作的。這裡我們就可以看到前面分析的同步等待和異步等待的差別。同步等待将一個SyncWaiter加入到一個WaitableEvent的Waiter清單去後,不能夠馬上傳回,而是要通過一個for循環不斷等待指定的WaitableEvent狀态變為Signaled為止,或者直到等待的時候超出指定的時間為止。

       AsyncWaiter類的實作如下所示:

class AsyncWaiter : public WaitableEvent::Waiter {
public:
AsyncWaiter(MessageLoop* message_loop,
const base::Closure& callback,
Flag* flag)
: message_loop_(message_loop),
callback_(callback),
flag_(flag) { }
virtual bool Fire(WaitableEvent* event) OVERRIDE {
// Post the callback if we haven't been cancelled.
if (!flag_->value()) {
message_loop_->PostTask(FROM_HERE, callback_);
}
// We are removed from the wait-list by the WaitableEvent itself. It only
// remains to delete ourselves.
delete this;
// We can always return true because an AsyncWaiter is never in two
// different wait-lists at the same time.
return true;
}
// See StopWatching for discussion
virtual bool Compare(void* tag) OVERRIDE {
return tag == flag_.get();
}
private:
MessageLoop *const message_loop_;
base::Closure callback_;
scoped_refptr<Flag> flag_;
};
           

       這個類定義在檔案external/chromium_org/base/synchronization/waitable_event_watcher_posix.cc中。

       從前面的分析可以知道,當要監控的WaitableEvent的狀态變為Signaled時,前面已經已經加入到它的Waiter清單的AsyncWaiter對象的成員函數Fire就會被調用。

       AsyncWaiter類的成員變量flag_指向的是一個Flag對象。前面提到,如果該Flag對象關聯的WaitableEventWatcher在要監控的WaitableEvent狀态還沒有變為Signaled就已經被銷毀,那麼該Flag對象的成員變量flag_的值就會被設定為true。在這種情況下,AsyncWaiter類的成員函數Fire就不需要執行成員變量callback_描述的一個Closure。

       另一方面,如果AsyncWaiter類的成員變量flag_指向的Flag對象的成員變量flag_的值保持為false,那麼就需要将成員變量callback_描述的一個Closure發送到成員變量message_loop_描述的一個消息循環去執行。

       從前面的分析可以知道,AsyncWaiter類的成員變量message_loop_描述的消息循環即為調用WaitableEventWatcher類的成員函數StartWatching的那個線程的消息循環,并且AsyncWaiter類的成員變量callback_描述的Closure綁定的函數為AsyncCallbackHelper,它的實作如下所示:

void AsyncCallbackHelper(Flag* flag,
const WaitableEventWatcher::EventCallback& callback,
WaitableEvent* event) {
// Runs in MessageLoop thread.
if (!flag->value()) {
// This is to let the WaitableEventWatcher know that the event has occured
// because it needs to be able to return NULL from GetWatchedObject
flag->Set();
callback.Run(event);
}
}
           

       這個函數定義在檔案external/chromium_org/base/synchronization/waitable_event_watcher_posix.cc中。

       參數flag描述的Flag對象就是前面在WaitableEventWatcher類的成員函數StartWatching中建立的Flag對象,參數evnet描述的就是要監控的WaitableEvent,而參數callback就是當參數evnet描述的WaitableEvent狀态變為Signaled時要執行的一個EventCallback。

       隻有在參數flag描述的Flag對象的成員變量flag_的值等于false的情況下,函數AsyncCallbackHelper才會執行參數callback描述的EventCallback,并且在執行該EventCallback之前,會将參數flag描述的Flag對象的成員變量flag_的值設定為true,用來表示參數callback描述的EventCallback已經執行過了。

       以上就是通過一個WaitableEventWatcher來監控一個WaitableEvent的狀态變為Signaled并且獲得通知的過程。前面我們還提及到一種情況,就是要監控的WaitableEvent的狀态尚未變為Signaled,執行監控的WaitableEventWatcher就已經被銷毀。接下來我們就繼續分析這種情況是如何處理的。

       當一個WaitableEventWatcher被銷毀時,它的析構函數就會被調用,如下所示:

WaitableEventWatcher::~WaitableEventWatcher() {
StopWatching();
}
           

       這個函數定義在檔案external/chromium_org/base/synchronization/waitable_event_watcher_posix.cc中。

       WaitableEventWatcher類的析構函數調用另外一個成員函數StopWatching停止監控之前在成員函數StartWatching指定的WaitableEvent。

       WaitableEventWatcher類的成員函數StopWatching的實作如下所示:

void WaitableEventWatcher::StopWatching() {
......
AutoLock locked(kernel_->lock_);
......
if (kernel_->Dequeue(waiter_, cancel_flag_.get())) {
....
delete waiter_;
internal_callback_.Reset();
cancel_flag_ = NULL;
return;
}
......
cancel_flag_->Set();
cancel_flag_ = NULL;
.....
}
           

       這個函數定義在檔案external/chromium_org/base/synchronization/waitable_event_watcher_posix.cc中。

       WaitableEventWatcher類的成員函數StopWatching要處理的邊界情況很多,這裡我們隻關心其中兩個最主要的情況。

       第一種情況是一個WaitableEventWatcher被銷毀時,它要監控的WaitableEvent的狀态依然是沒有變為Signaled。這時候該WaitableEventWatcher的成員變量waiter_指向的一個AsyncWaiter對象是在要監控的WaitableEvent的Waiter清單中。是以這時候調用與該WaitableEvent關聯的一個WaitableEventKernel對象的成員函數Dequeue可以将它從Waiter清單中删除,即調用上述WaitableEventKernel對象的成員函數Dequeue的傳回值為true。這樣就可以保證以後要監控的WaitableEvent的狀态變為Signaled時,目前被銷毀的WaitableEventWatcher不會獲得任務通知。

       第二種情況是一個WaitableEventWatcher被銷毀時,它要監控的WaitableEvent的狀态已經變為Signaled。從前面的分析可以知道,這時候該WaitableEventWatcher的成員變量waiter_指向的一個AsyncWaiter對象已經不在要監控的WaitableEvent的Waiter清單了,并且它的成員函數Fire已經被調用。但是該AsyncWaiter對象的成員變量callback_描述的一個Closure可能還沒有被排程執行,這時候就需要将與它關聯的一個Flag對象的成員變量flag_的值設定為true,保證該Closure不會被執行。這個Flag對象就是WaitableEventWatcher類的成員變量cancel_flag指向的Flag對象,調用它的成員函數Set即可将它的成員變量flag_的值設定為true。

       這裡有一點需要注意的是,WaitableEventWatcher類的成員函數StopWatching在調用WaitableEventKernel類的成員函數Dequeue從Waiter清單中删除成員變量waiter_描述的一個AsyncWaiter時,傳遞的參數除了要删除的AsyncWaiter的位址外,還包括與它關聯的一個Flag對象的位址。為什麼不可以隻傳遞要删除的AsyncWaiter的位址給WaitableEventKernel類的成員函數Dequeue呢?這是為了處理一種稱為ABA的問題的。

       當一個AsyncWaiter被Fired時,它的成員函數Fire會被調用,并且這個成員函數在執行自我銷毀的操作。如果這時候恰好其它地方又建立了一個AsyncWaiter,并且這個新建立的AsyncWaiter被添加到了同樣的WaitableEvent的Waiter清單中,更神奇的是這個新建立的AsyncWaiter占用的記憶體與前面被銷毀的AsyncWaiter占用的記憶體是完全一樣的。這樣就會導緻前面調用WaitableEventKernel類的成員函數Dequeue删除了一個不該删除的AsyncWaiter!

       注意,這種情況隻會出現在異步等待WaitableEvent狀态變為Signaled的情況,對于同步等待WaitableEvent狀态變為Signaled的情況是沒有這樣的問題的。分析為了避免這種情況,WaitableEventKernel類的成員函數Dequeue要求删除一個Waiter時,提供另外一個額外的參數tag,該參數會傳遞給即将被删除的Waiter的成員函數Compare。隻有被删除的Waiter存在Waiter清單中,并且它的成員函數Compare的傳回值也為true的情況下,WaitableEventKernel類的成員函數Dequeue才會将它從Waiter清單中删除,如下所示:

bool WaitableEvent::WaitableEventKernel::Dequeue(Waiter* waiter, void* tag) {
for (std::list<Waiter*>::iterator
i = waiters_.begin(); i != waiters_.end(); ++i) {
if (*i == waiter && (*i)->Compare(tag)) {
waiters_.erase(i);
return true;
}
}
return false;
}
           

       這個函數定義在檔案external/chromium_org/base/synchronization/waitable_event_posix.cc。

       AsyncWaiter類的成員函數Compare的實作如下所示:

class AsyncWaiter : public WaitableEvent::Waiter {
public:
......
// See StopWatching for discussion
virtual bool Compare(void* tag) OVERRIDE {
return tag == flag_.get();
}
private:
......
scoped_refptr<Flag> flag_;
};
           

       這個函數定義在檔案external/chromium_org/base/synchronization/waitable_event_watcher_posix.cc中。

       從這裡我們可以看到,隻有當參數tag的值等于AsyncWaiter類的成員變量flag_引用的一個Flag對象的位址值時,AsyncWaiter類的成員函數Compare的傳回值才為true。

       由于AsyncWaiter類的成員變量flag_是一個scoped_refptr智能指針,它引用的Flag對象同時也被關聯的WatiableEventWatcher對象通過scoped_refptr智能指針引用,這意味着當一個AsyncWaiter被銷毀時,它的成員變量flag_引用的Flag對象是仍然存在的,這樣通過比較它的位址值與參數tag的值,就可以差別出兩個前後配置設定在同一塊記憶體的AsyncWaiter對象來。

       至此,我們就分析完成了WaitableEvent類是如何實作線程在無消息處理時進入睡眠狀态,并且在有消息處理時從睡眠狀态喚醒過來的。總結來說,就是通過條件變量來實作的。同時,我們還知道,WaitableEvent類不僅可以實作同步等待,還可以實作異步等待,前者通過配合SyncWaiter類實作,而後者通過配合AsyncWaiter類實作。異步等待,或者說異步操作,是Chromium的一大特色。正是由于使用了大量的異步操作,Chromium才能做到快速地響應使用者輸入,進而使得使用者感覺到Chromium在打開網頁的時候非常快。

       回到MessagePumpDefault類的成員函數Run中,我們前面提到,每次它通過成員變量event_描述的一個WaitableEvent從睡眠狀态喚醒過來之後,會依次調用參數delegate描述的一個MessageLoop對象的成員函數DoWork和DoDelayedWork處理消息隊列的消息以及成員函數DoIdleWork處理一些線程空閑時任務。

       在分析MessageLoop類的成員函數DoWork、DoDelayedWork和DoIdleWork之前,我們首先分析向一個線程的消息隊列發送消息的過程。這是通過我們在前面提到的MessageLoop類的成員函數PostTask、PostDelayedTask、 PostNonNestableTask和PostNonNestableDelayedTask實作的。它們的定義如下所示:

void MessageLoop::PostTask(
const tracked_objects::Location& from_here,
const Closure& task) {
......
incoming_task_queue_->AddToIncomingQueue(from_here, task, TimeDelta(), true);
}
void MessageLoop::PostDelayedTask(
const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay) {
......
incoming_task_queue_->AddToIncomingQueue(from_here, task, delay, true);
}
void MessageLoop::PostNonNestableTask(
const tracked_objects::Location& from_here,
const Closure& task) {
......
incoming_task_queue_->AddToIncomingQueue(from_here, task, TimeDelta(), false);
}
void MessageLoop::PostNonNestableDelayedTask(
const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay) {
......
incoming_task_queue_->AddToIncomingQueue(from_here, task, delay, false);
}
           

       這四個函數定義在檔案external/chromium_org/base/message_loop/message_loop.cc中。

       MessageLoop類的上述四個成員函數都是通過調用成員變量incoming_task_queue_描述的一個IncomingTaskQueue對象的成員函數AddToIncomingQueue來發送參數task描述的一個Closure對象到線程的消息隊列去的。

       IncomingTaskQueue類的成員函數AddToIncomingQueue的實作如下所示:

bool IncomingTaskQueue::AddToIncomingQueue(
const tracked_objects::Location& from_here,
const Closure& task,
TimeDelta delay,
bool nestable) {
AutoLock locked(incoming_queue_lock_);
PendingTask pending_task(
from_here, task, CalculateDelayedRuntime(delay), nestable);
return PostPendingTask(&pending_task);
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/incoming_task_queue.cc中。

       IncomingTaskQueue類的成員函數AddToIncomingQueue首先将參數task描述的Closure封裝在一個PendingTask中,然後調用另外一個成員函數PostPendingTask将它發送到線程的消息隊列中去。由于消息隊列既會被發送消息的線程操作,也會被處理消息的線程操作,是以它需要在加鎖的前提下進行操作。這個鎖通過IncomingTaskQueue類的成員變量incoming_queue_lock_描述。

       IncomingTaskQueue類的成員函數PostPendingTask的實作如下所示:

bool IncomingTaskQueue::PostPendingTask(PendingTask* pending_task) {
......
// This should only be called while the lock is taken.
incoming_queue_lock_.AssertAcquired();
......
bool was_empty = incoming_queue_.empty();
incoming_queue_.push(*pending_task);
pending_task->task.Reset();
// Wake up the pump.
message_loop_->ScheduleWork(was_empty);
return true;
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/incoming_task_queue.cc中。

       IncomingTaskQueue類的成員函數PostPendingTask首先是調用成員變量incoming_queue_lock_描述的一個鎖的成員函數AssertAcquired確定該鎖已經被調用者擷取,因為它接下來要操作線程的消息隊列。

       IncomingTaskQueue類的成員函數PostPendingTask接下來要做的事情很簡單,一是将參數pending_task描述的一個PendingTask儲存在成員變量incoming_queue_描述的一個TaskQueue中,二是調用成員變量message_loop_描述的一個MessageLoop的成員函數ScheduleWork喚醒線程對剛才添加在TaskQueue的PendingTask進行處理。

       注意,在調用MessageLoop類的成員函數ScheduleWork的時候,傳遞有一個參數was_empty,該參數用來描述在添加參數pending_task描述的一個PendingTask到線程的消息隊列之前,線程的消息隊列是否為空。如果為空,意味着線程目前處于無限睡眠狀态中,是以需要主動喚醒它。如果不為空,則說明線程目前要麼正在運作,要麼是處于一個會自動喚醒過來的睡眠狀态中。後面這種情況不需要喚醒線程的。

       MessageLoop類的成員函數ScheduleWork的實作如下所示:

bool AlwaysNotifyPump(MessageLoop::Type type) {
#if defined(OS_ANDROID)
return type == MessageLoop::TYPE_UI || type == MessageLoop::TYPE_JAVA;
#else
return false;
#endif
}
......
void MessageLoop::ScheduleWork(bool was_empty) {
// The Android UI message loop needs to get notified each time
// a task is added to the incoming queue.
if (was_empty || AlwaysNotifyPump(type_))
pump_->ScheduleWork();
}
           

       這兩個函數定義在檔案external/chromium_org/base/message_loop/message_loop.cc中。

       如前所述,當參數was_empty的值等于true的時候,MessageLoop類的成員函數ScheduleWork就會調用成員變量pump_描述的一個MessagePump對象的成員函數ScheduleWork來喚醒線程。

       但是對于Android平台來說,如果目前正在處理的MessageLoop關聯的是一個UI線程或者Java線程,不管參數was_empty的值是否為true,都需要喚醒它們。這是因為對于Android平台來說,UI線程和Java線程在Java層有着自己的消息循環,Native層的消息循環是借助于Java層的消息循環來實作的。這意味着線程的消息循環不是由Native來管理的,也就是Native層不知道Java的消息循環的管理邏輯,它就隻有每當有新的消息加入,都通知一下Java層對該消息進行處理。後面我們再詳細分析Android平台的UI線程和Java線程在Native層的消息循環的實作。

       前面我們假設MessageLoop類的成員變量pump_指向的是一個MessagePumpDefault對象,是以接下來MessageLoop類的成員函數ScheduleWork調用的是MessagePumpDefault類的成員函數ScheduleWork,它的實作如下所示:

void MessagePumpDefault::ScheduleWork() {
// Since this can be called on any thread, we need to ensure that our Run
// loop wakes up.
event_.Signal();
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_pump_default.cc中。

       前面分析MessagePumpDefault類的成員函數Run的時候提到,如果一個線程目前是處于睡眠狀态的,那麼它就是通過調用成員變量event_描述的一個WaitableEvent的成員函數Wait或者TimedWait進入到睡眠狀态的,是以現在就可以通過調用該WaitableEvent的成員函數Signal來喚醒它。這個喚醒過程可以參考前面分析的WaitableEvent類的成員函數Signal的實作。

       線程被喚醒之後,如前所述,就會依次調用MessageLoop類的成員函數DoWork、DoDelayedWork和DoIdleWork。

       MessageLoop類的成員函數DoWork的實作如下所示:

bool MessageLoop::DoWork() {
if (!nestable_tasks_allowed_) {
// Task can't be executed right now.
return false;
}
for (;;) {
ReloadWorkQueue();
if (work_queue_.empty())
break;
// Execute oldest task.
do {
PendingTask pending_task = work_queue_.front();
work_queue_.pop();
if (!pending_task.delayed_run_time.is_null()) {
AddToDelayedWorkQueue(pending_task);
// If we changed the topmost task, then it is time to reschedule.
if (delayed_work_queue_.top().task.Equals(pending_task.task))
pump_->ScheduleDelayedWork(pending_task.delayed_run_time);
} else {
if (DeferOrRunPendingTask(pending_task))
return true;
}
} while (!work_queue_.empty());
}
// Nothing happened.
return false;
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_loop.cc中。

       在預設情況下,消息是禁止嵌套處理的,也就是說,線程在處理一個消息的過程時,不能夠處理其它消息,這時候MessageLoop類的成員變量nestable_tasks_allowed_的值會被設定為false。是以,MessageLoop類的成員函數DoWork首先是判斷成員變量nestable_tasks_allowed_的值是否等于false。如果等于的話,就什麼也不做就傳回了。

       如果我們确實嵌套處理消息,那麼需要通過ScopedNestableTaskAllower類臨時設定線程的MessageLoop允許執行嵌套消息,即将MessageLoop類的成員變量nestable_tasks_allowed_設定為true。

       ScopedNestableTaskAllower類的實作如下所示:

class BASE_EXPORT MessageLoop : public MessagePump::Delegate {
public:
......
// Enables nestable tasks on |loop| while in scope.
class ScopedNestableTaskAllower {
public:
explicit ScopedNestableTaskAllower(MessageLoop* loop)
: loop_(loop),
old_state_(loop_->NestableTasksAllowed()) {
loop_->SetNestableTasksAllowed(true);
}
~ScopedNestableTaskAllower() {
loop_->SetNestableTasksAllowed(old_state_);
}
private:
MessageLoop* loop_;
bool old_state_;
};
......
};
           

       這個類定義在檔案external/chromium_org/base/message_loop/message_loop.h中。

       ScopedNestableTaskAllower類的構造函數調用了MessageLoop類的成員函數SetNestableTasksAllowed将線程的消息循環設定為可嵌套執行消息,并且在析構函數中也是調用MessageLoop類的成員函數SetNestableTasksAllowed将線程的消息循環設定為不可嵌套執行消息。

       回到MessageLoop類的成員函數DoWork中,它接下來是通過兩個循環不斷地處理線程的消息隊列的消息,直到該消息隊列為空為止。

       在外層的循環中,MessageLoop類的成員函數DoWork首先是調用另外一個成員函數ReloadWorkQueue将儲存在成員變量incoming_task_queue_描述的一個IncomingTaskQueue中的消息提取出來,儲存在成員變量work_queue_描述的一個TaskQueue中,然後再通過内層的循環對儲存在該TaskQueue的每一個消息進行處理。

       MessageLoop類的成員函數ReloadWorkQueue的實作如下所示:

void MessageLoop::ReloadWorkQueue() {
......
if (work_queue_.empty())
incoming_task_queue_->ReloadWorkQueue(&work_queue_);
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_loop.cc中。

       在成員變員work_queue_描述的TaskQueue為空的情況下, MessageLoop類的成員函數ReloadWorkQueue調用成員變量incoming_task_queue_描述的一個IncomingTaskQueue對象的成員函數ReloadWorkQueue将它裡面消息都提取到成員變員work_queue_描述的一個TaskQueue中。

       IncomingTaskQueue類的成員函數ReloadWorkQueue的實作如下所示:

void IncomingTaskQueue::ReloadWorkQueue(TaskQueue* work_queue) {
......
// Acquire all we can from the inter-thread queue with one lock acquisition.
AutoLock lock(incoming_queue_lock_);
if (!incoming_queue_.empty())
incoming_queue_.Swap(work_queue); // Constant time
......
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/incoming_task_queue.cc中。

       在成員變量incoming_queue_描述的TaskQueue不為空的情況下, IncomingTaskQueue類的成員函數ReloadWorkQueue通過交換它和參數work_queue描述的TaskQueue即可将它裡面的消息都提取給參數work_queue描述的TaskQueue。

       回到MessageLoop類的成員函數DoWork中,它接下來就通過内層循環對已經提取到成員變量work_queue_描述的TaskQueue的消息進行處理,為了友善描述,我們将相應的代碼再列出來,如下所示:

// Execute oldest task.
do {
PendingTask pending_task = work_queue_.front();
work_queue_.pop();
if (!pending_task.delayed_run_time.is_null()) {
AddToDelayedWorkQueue(pending_task);
// If we changed the topmost task, then it is time to reschedule.
if (delayed_work_queue_.top().task.Equals(pending_task.task))
pump_->ScheduleDelayedWork(pending_task.delayed_run_time);
} else {
if (DeferOrRunPendingTask(pending_task))
return true;
}
} while (!work_queue_.empty());
           

      這段代碼依次地将儲存在成員變量work_queue_描述的TaskQueue中的消息提取出來。每一個消息都是通過一個PendingTask描述的。

      如果提取出來的消息是一個延遲處理的消息,即對應的PendingTask對象的成員變量delayed_run_time設定的時間不為空,那麼就會調用MessageLoop類的成員函數AddToDelayedWorkQueue将它添加到另外一個延遲處理的消息隊列中。如果該延遲消息被添加到了延遲處理消息隊列的頭部,那麼就意味着要修改線程的下一次進入睡眠狀态的時間長度,這是因為儲存在處遲處理消息隊列的消息是按照延遲處理時間從小到大的順序排序的。

      MessageLoop類的成員函數AddToDelayedWorkQueue的實作如下所示:

void MessageLoop::AddToDelayedWorkQueue(const PendingTask& pending_task) {
// Move to the delayed work queue.
delayed_work_queue_.push(pending_task);
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_loop.cc中。

       MessageLoop類使用的延遲處理消息隊列由成員變量delayed_work_queue_描述的一個DelayedTaskQueue對象表示,當調用它的成員函數push新添加一個PendingTask時,就會根據該PendingTask延遲執行時間點将放在隊列的合适位置,使得隊列始終是按照延遲執行時間點從小到大的順序排列它裡面的PendingTask。

       修改線程的下一次進入睡眠狀态的時間長度是通過調用MessageLoop類的成員變量pump_指向的一個MessagePumpDefault對象的成員函數ScheduleDelayedWork實作的,它的實作如下所示:

void MessagePumpDefault::ScheduleDelayedWork(
const TimeTicks& delayed_work_time) {
// We know that we can't be blocked on Wait right now since this method can
// only be called on the same thread as Run, so we only need to update our
// record of how long to sleep when we do sleep.
delayed_work_time_ = delayed_work_time;
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_pump_default.cc中。

       MessagePumpDefault類的成員函數ScheduleDelayedWork隻是簡單地将參數delayed_work_time描述的時間儲存在成員變量delayed_work_time_中。等到MessageLoop類的成員函數DoWork執行完畢回到MessagePumpDefault類的成員函數Run時,該時間就會用來計算線程下一次要進入睡眠狀态的時間長度。

       最後,如果一個消息需要馬上處理,那麼MessageLoop類的成員函數DoWork的内層循環就會調用另外一個成員函數DeferOrRunPendingTask來對它進行處理。

       MessageLoop類的成員函數DeferOrRunPendingTask的實作如下所示:

bool MessageLoop::DeferOrRunPendingTask(const PendingTask& pending_task) {
if (pending_task.nestable || run_loop_->run_depth_ == 1) {
RunTask(pending_task);
// Show that we ran a task (Note: a new one might arrive as a
// consequence!).
return true;
}
// We couldn't run the task now because we're in a nested message loop
// and the task isn't nestable.
deferred_non_nestable_work_queue_.push(pending_task);
return false;
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_loop.cc中。

       參數pending_task描述的消息能夠馬上執行需要滿足以下兩個條件之一:

       1.  參數pending_task描述的消息是一個可嵌套處理的消息,即對應的PendingTask對象的成員變量nestable的值等于true。

       2.  參數pending_task描述的消息不是一個可嵌套處理的消息,但是線程目前運作在最外層的消息循環中,即MessageLoop類的成員變量run_loop_描述的一個RunLoop對象的成員變量run_depth_的值等于1。

       如果以上兩個條件都不能滿足,那麼就将參數pending_task描述的消息添加到成員變量deferred_non_nestable_work_queue_描述的一個TaskQueue中等待在合适的時候再處理。

       如果能滿足以上兩個條件之一,那麼就将參數pending_task描述的消息就會被MessageLoop類的成員函數RunTask進行處理,如下所示:

void MessageLoop::RunTask(const PendingTask& pending_task) {
......
// Execute the task and assume the worst: It is probably not reentrant.
nestable_tasks_allowed_ = false;
......
pending_task.task.Run();
......
nestable_tasks_allowed_ = true;
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_loop.cc中。

       從這裡就可以看到,MessageLoop類的成員函數RunTask在執行參數pending_task描述的消息之前,會先将成員變量nestable_tasks_allowed_的值設定為false,用來禁止線程嵌套執行其它消息,并且在執行完成參數pending_task描述的消息的之後,将成員變量nestable_tasks_allowed_的值重新設定為true。 

       從前面分析的MessageLoop類的成員函數PostTask、PostDelayedTask、 PostNonNestableTask和PostNonNestableDelayedTask的實作可以知道,參數pending_task描述的消息實際上是一個Closure對象,該Closure對象儲存在參數pending_task指向的一個PendingTask對象的成員變量task中。從前面 Chromium多線程通信的Closure機制分析一文可以知道,調用該Closure對象的成員函數Run即可執行它描述的任務。 

       接下來我們繼續分析MessageLoop類的成員函數DoDelayedWork的實作,如下所示:

bool MessageLoop::DoDelayedWork(TimeTicks* next_delayed_work_time) {
if (!nestable_tasks_allowed_ || delayed_work_queue_.empty()) {
recent_time_ = *next_delayed_work_time = TimeTicks();
return false;
}
......
TimeTicks next_run_time = delayed_work_queue_.top().delayed_run_time;
if (next_run_time > recent_time_) {
recent_time_ = TimeTicks::Now(); // Get a better view of Now();
if (next_run_time > recent_time_) {
*next_delayed_work_time = next_run_time;
return false;
}
}
PendingTask pending_task = delayed_work_queue_.top();
delayed_work_queue_.pop();
if (!delayed_work_queue_.empty())
*next_delayed_work_time = delayed_work_queue_.top().delayed_run_time;
return DeferOrRunPendingTask(pending_task);
}
           

      這個函數定義在檔案external/chromium_org/base/message_loop/message_loop.cc中。

      參數DoDelayedWork描述的是下一個延遲處理消息的執行時間點。

      如果線程的延遲處理消息隊列空,那麼MessageLoop類的成員函數DoDelayedWork就隻是簡單地置空成員變量recent_time_和參數next_delayed_work_time描述的時間變傳回了。

      另一方面,與MessageLoop類的成員函數DoWork類似,MessageLoop類的成員函數DoDelayedWork也禁止處理嵌套消息,是以,當MessageLoop類的成員變量nestable_tasks_allowed_的值等于false的時候,MessageLoop類的成員函數DoDelayedWork就直接傳回。

      如果MessageLoop類的成員函數DoDelayedWork可以繼續往下執行,那麼它就檢查位于延遲處理消息隊列頭部的消息,并且判斷它的執行時間是否大于目前時間。如果是的話的,那麼就說明該消息還未到時間執行,是以MessageLoop類的成員函數DoDelayedWork不會執行它,而是直接傳回。

      最後,如果位于延遲處理消息隊列頭部的消息的執行時間小于等于目前時間,那麼就是時間将它從隊列中取出,并且執行了。在執行之前,MessageLoop類的成員函數DoDelayedWork會獲得下一個延遲處理消息的執行時間點,并且儲存在參數next_delayed_work_time描述的一個TimeTicks對象,以便傳回到MessagePumpDefault類的成員函數Run的時候,後者可以計算出下一次進入睡眠狀态的時間長度。

      延遲處理消息同樣是通過我們在前面分析的MessageLoop類的成員函數DeferOrRunPendingTask來執行的,是以這裡不再複述。

      接下來我們繼續分析MessageLoop類的成員函數DoIdleWork的實作,如下所示:

bool MessageLoop::DoIdleWork() {
if (ProcessNextDelayedNonNestableTask())
return true;
if (run_loop_->quit_when_idle_received_)
pump_->Quit();
return false;
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_loop.cc中。

       MessageLoop類的成員函數DoIdleWork調用另外一個成員函數ProcessNextDelayedNonNestableTask處理那些被延遲的不能嵌套處理的消息。

       如果線程沒有被延遲的不能嵌套處理的消息,并且目前消息循環使用的RunLoop的成員變量quit_when_idle_received_的值被設定為true,即線程被設定在空閑時無事可做時,就會退出線程,這是通過調用成員變量pump_指向的一個MessagePumpDefault對象的成員函數Quit來實作的。

       接下來我們主要分析MessageLoop類的成員函數ProcessNextDelayedNonNestableTask的實作,如下所示:

bool MessageLoop::ProcessNextDelayedNonNestableTask() {
if (run_loop_->run_depth_ != 1)
return false;
if (deferred_non_nestable_work_queue_.empty())
return false;
PendingTask pending_task = deferred_non_nestable_work_queue_.front();
deferred_non_nestable_work_queue_.pop();
RunTask(pending_task);
return true;
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_loop.cc中。

       被延遲的不能嵌套處理的消息儲存在MessageLoop類的成員變量deferred_non_nestable_work_queue_描述的一個TaskQueue中,這些消息隻能夠在最外層的消息循環中執行。是以,MessageLoop類的成員函數ProcessNextDelayedNonNestableTask首先判斷線程目前是否運作在最外層的消息循環中,即判斷成員變量run_loop_指向的一個RunLoop對象的成員變量run_depth_的值是否等于1。如果不等于1,那麼就直接傳回不往下執行了。

       如果線程目前是運作在最外層的消息循環中,那麼接下就從成員變量deferred_non_nestable_work_queue_描述的一個TaskQueue的頭部取出一個消息,并且調用前面分析過的成員函數RunTask對它進行處理。

       至此,我們就分析完成了線程的啟動、圍繞消息隊列運作、發送消息和處理消息的過程了。其中,圍繞消息隊列運作這一過程是針對普通的線程的。對于Android平台的UI線程和Java線程,由于它們在Java層使用了Android系統提供的消息循環機制,是以如果我們需要在Native層使用Chromium提供的消息循環機制,就要進行特殊處理。接下來我們就繼續分析如何在Android平台的UI線程和Java線程中使用Chromium提供的消息循環機制。關于Android系統提供的消息循環機制,可以參考 Android應用程式消息處理機制(Looper、Handler)分析一文。

       對于Android平台的UI線程和Java線程來說,它們使用的消息循環和消息泵分别是通過MessageLoopForUI類和MessagePumpForUI類描述的,這就差別于一般線程使用MessageLoop類和MesagePumpDefault來描述消息循環和消息泵。接下來我們以UI線程為例來說明它是如何使用Chromium提供的消息循環機制的。對于Java線程,原理是一樣的。

       當我們在Android應用程式中使用WebView的時候,會在UI線程中調用BrowserMainRunnerImpl類的成員函數Initialize執行一些初始化工作,其中就包括在Native層中建立一個類型為MessageLoopForUI的消息循環。在後面的文章中,我們分析WebView的啟動過程時,就會看到這一過程。現在我們直接分析BrowserMainRunnerImpl類的成員函數Initialize的實作,如下所示:

class BrowserMainRunnerImpl : public BrowserMainRunner {
public:
......
virtual int Initialize(const MainFunctionParams& parameters) OVERRIDE {
......
if (!initialization_started_) {
initialization_started_ = true;
main_loop_.reset(new BrowserMainLoop(parameters));
......
main_loop_->EarlyInitialization();
......
main_loop_->MainMessageLoopStart();
......
}
......
int result_code = main_loop_->GetResultCode();
if (result_code > 0)
return result_code;
// Return -1 to indicate no early termination.
return -1;
}
......
};
           

       這個函數定義在檔案external/chromium_org/content/browser/browser_main_runner.cc。

       BrowserMainRunnerImpl類的成員函數Initialize首先是建立了一個BrowserMainLoop對象,并且儲存在成員變量main_loop_中。接下來再調用該BrowserMainLoop對象的成員函數EarlyInitialization執行早期初始化工作。這個早期初始化工作就包括了建立一個類型為MessageLoopForUI的消息循環,如下所示:

void BrowserMainLoop::EarlyInitialization() {
......
if (parts_)
parts_->PreEarlyInitialization();
......
}
           

       這個函數定義在檔案external/chromium_org/content/browser/browser_main_loop.cc中。

       BrowserMainLoop類的成員變量parts_指向的是一個AwBrowserMainParts對象,這裡調用它的成員函數PreEarlyInitialization建立一個類型為MessageLoopForUI的消息循環,如下所示:

void AwBrowserMainParts::PreEarlyInitialization() {
......
// Android WebView does not use default MessageLoop. It has its own
// Android specific MessageLoop. Also see MainMessageLoopRun.
DCHECK(!main_message_loop_.get());
main_message_loop_.reset(new base::MessageLoopForUI);
base::MessageLoopForUI::current()->Start();
}
           

       這個函數定義在檔案external/chromium_org/android_webview/browser/aw_browser_main_parts.cc中。

       AwBrowserMainParts類的成員函數PreEarlyInitialization建立了一個MessageLoopForUI對象,接着再調用它的成員函數Start執行啟動工作。

       接下來我們先分析MessageLoopForUI對象的建立過程,即MessageLoopForUI類的構造函數的實作,如下所示:

class BASE_EXPORT MessageLoopForUI : public MessageLoop {
public:
MessageLoopForUI() : MessageLoop(TYPE_UI) {
}
......
};
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_loop.h中。

       MessageLoopForUI類繼承了MessageLoop類。MessageLoopForUI類的構造函數在調用父類MessageLoop的構造函數時,傳遞進去的參數為TYPE_UI,表示要建立一個類型為TYPE_UI的消息循環。從前面的分析可以知道,MessageLoop類的構造函數會根據傳遞給它的參數TYPE_UI建立一個類型為MessagePumpForUI的消息泵,并且儲存在成員變量pump_中。

       回到AwBrowserMainParts類的成員函數PreEarlyInitialization中,當它建立了一個MessageLoopForUI對象之後,接下來就會調用它的成員函數Start執行啟動工作,如下所示:

void MessageLoopForUI::Start() {
// No Histogram support for UI message loop as it is managed by Java side
static_cast<MessagePumpForUI*>(pump_.get())->Start(this);
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_loop.cc中。

       MessageLoopForUI類的成員變量pump_是從父類MessageLoop繼承下來的,從前面的分析可以知道,它指向的是一個MessagePumpForUI對象,這裡調用它的成員函數Start執行啟動工作,如下所示:

void MessagePumpForUI::Start(Delegate* delegate) {
run_loop_ = new RunLoop();
// Since the RunLoop was just created above, BeforeRun should be guaranteed to
// return true (it only returns false if the RunLoop has been Quit already).
if (!run_loop_->BeforeRun())
NOTREACHED();
DCHECK(system_message_handler_obj_.is_null());
JNIEnv* env = base::android::AttachCurrentThread();
DCHECK(env);
system_message_handler_obj_.Reset(
Java_SystemMessageHandler_create(
env, reinterpret_cast<intptr_t>(delegate)));
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_pump_android.cc中。

       MessagePumpForUI類的成員函數Start首先是建立了一個RunLoop對象,并且調用它的成員函數BeforeRun建立最外層的消息循環,接下來調用函數Java_SystemMessageHandler_create建立了一個Java層的SystemMessageHandler對象,并且儲存在成員變量system_message_handler_obj_對象。這個SystemMessageHandler對象就是Java層和Native層的消息循環之間進行通信的橋梁。

       這樣,Android應用程式的UI線程在Chromium中使用的消息循環就啟動起來了。注意,Android應用程式的UI線程是在Java層圍繞Android的消息隊列運作的,是以它不能像普通的線程一樣,也在Native層圍繞Chromium的消息隊列運作,也就是Android應用程式的UI線程不會調用MessageLoopForUI類的成員函數Run進入運作狀态。

       這一步執行完成之後,一直傳回到BrowserMainRunnerImpl類的成員函數Initialize,它接下來調用前面建立的BrowserMainLoop對象的成員函數MainMessageLoopStart,繼續執行其它的初始化工作,如下所示:

void BrowserMainLoop::MainMessageLoopStart() {
......
InitializeMainThread();
......
}
           

      這個函數定義在檔案external/chromium_org/content/browser/browser_main_loop.cc中。

      BrowserMainLoop對象的成員函數MainMessageLoopStart執行的其中一個工作是調用另外一個成員函數InitializeMainThread初始化主線程,如下所示:

void BrowserMainLoop::InitializeMainThread() {
......
// Register the main thread by instantiating it, but don't call any methods.
main_thread_.reset(
new BrowserThreadImpl(BrowserThread::UI, base::MessageLoop::current()));
}
           

       這個函數定義在檔案external/chromium_org/content/browser/browser_main_loop.cc中。

       BrowserMainLoop類的成員函數InitializeMainThread為主線程建立了一個BrowserThreadImpl對象,并且儲存在成員變量main_thread_中。

       BrowserThreadImpl對象的建立過程如下所示:

BrowserThreadImpl::BrowserThreadImpl(ID identifier,
base::MessageLoop* message_loop)
: Thread(message_loop->thread_name()), identifier_(identifier) {
set_message_loop(message_loop);
Initialize();
}
           

       這個函數定義在檔案external/chromium_org/content/browser/browser_thread_impl.cc。

       從前面的調用過程可以知道,參數identifier的值為BrowserThread::UI,而參數message_loop指向的是一個MessageLoopForUI對象。

       BrowserThreadImpl類是從我們前面分析過的base::Thread類繼承下來的,BrowserThreadImpl類調用的成員函數set_message_loop也是從父類base::Thread繼承下來的,這裡調用它來為UI線程設定一個消息循環,如下所示:

class BASE_EXPORT Thread : PlatformThread::Delegate {
public:
......
protected:
......
void set_message_loop(MessageLoop* message_loop) {
message_loop_ = message_loop;
}
......
private:
......
MessageLoop* message_loop_;
......
};
           

       這個函數定義在檔案external/chromium_org/base/threading/thread.h中。

       這意味Android應用程式的UI線程雖然不像其它線程一樣,不能在Native層圍繞Chromium的消息隊列運作,但是它也像其線程一樣,使用一個Thead對象來描述,并且這個Thread對象具有一個類型為MessageLoopForUI的消息循環。

       傳回到BrowserThreadImpl類的構造函數中,設定了Android應用程式的UI線程的消息循環之外,接下來調用另外一個成員函數Initialize執行其它的初始化工作,如下所示:

base::LazyInstance<BrowserThreadGlobals>::Leaky
g_globals = LAZY_INSTANCE_INITIALIZER;
......
void BrowserThreadImpl::Initialize() {
BrowserThreadGlobals& globals = g_globals.Get();
base::AutoLock lock(globals.lock);
......
globals.threads[identifier_] = this;
}
           

       這個函數定義在檔案external/chromium_org/content/browser/browser_thread_impl.cc中。

       BrowserThreadImpl類的成員函數Initialize做的工作實際上就是将用來描述UI線程的一個BrowserThreadImpl對象注冊在全局變量g_globals指向的一個BrowserThreadGlobals對象的成員變量threads描述的一個BrowserThreadImpl數組中。

       有了這個BrowserThreadImpl數組之後,以後就可以通過BrowserThread類的靜态成員函數PostTask、PostDelayedTask、PostNonNestableTask和PostNonNestableDelayedTask等向UI線程發送消息。以BrowserThread類的靜态成員函數PostTask為例,調用它向UI線程發送消息的代碼如下所示:

BrowserThread::PostTask(BrowserThread::UI, FROM_HERE, task);
           

       BrowserThread類的靜态成員函數PostTask的實作如下所示:

bool BrowserThread::PostTask(ID identifier,
const tracked_objects::Location& from_here,
const base::Closure& task) {
return BrowserThreadImpl::PostTaskHelper(
identifier, from_here, task, base::TimeDelta(), true);
}
           

      這個函數定義在檔案external/chromium_org/content/browser/browser_thread_impl.cc中。

      BrowserThread類的靜态成員函數PostTask調用了BrowserThreadImpl類的靜态成員函數PostTaskHelper向參數identifier描述的線程的消息隊列發送一個Closure。

      BrowserThreadImpl類的靜态成員函數PostTaskHelper的實作如下所示:

bool BrowserThreadImpl::PostTaskHelper(
BrowserThread::ID identifier,
const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta delay,
bool nestable) {
......
base::MessageLoop* message_loop =
globals.threads[identifier] ? globals.threads[identifier]->message_loop()
: NULL;
if (message_loop) {
if (nestable) {
message_loop->PostDelayedTask(from_here, task, delay);
} else {
message_loop->PostNonNestableDelayedTask(from_here, task, delay);
}
}
.....
return !!message_loop;
}
           

       這個函數定義在檔案external/chromium_org/content/browser/browser_thread_impl.cc中。

       BrowserThreadImpl類的靜态成員函數PostTaskHelper首先是根據參數identifier在前面描述的全局變量g_globals指向的一個BrowserThreadGlobals對象的成員變量threads描述的一個BrowserThreadImpl數組中獲得一個對應的BrowserThreadImpl對象,接着再通過調用BrowserThreadImpl對象的成員函數message_loop獲得一個MessageLoop對象。有了這個MessageLoop對象之後,就可以調用它的成員函數PostDelayedTask或者PostNonNestableDelayedTask向指定的線程的消息隊列發送消息了。

       前面我們已經分析過MessageLoop類的成員函數PostDelayedTask和PostNonNestableDelayedTask了。從前面的分析可以知道,當它們向線程的消息隊列發送了一個消息之後,最後會調用它的成員變量pump_描述的一個MessagePump對象的成員函數ScheduleWork喚醒線程,以便它可以處理新發送到消息隊列的消息。

       從前面的分析可以知道,對于UI線程來說,它使用的消息循環通過類MessageLoopForUI來描述,而MessageLoopForUI類的成員變量pump_指向的是一個MessagePumpForUI對象。MessagePumpForUI類是從MessagePump類繼承下來的,并且重寫了成員函數ScheduleWork,如下所示:

void MessagePumpForUI::ScheduleWork() {
DCHECK(!system_message_handler_obj_.is_null());
JNIEnv* env = base::android::AttachCurrentThread();
DCHECK(env);
Java_SystemMessageHandler_scheduleWork(env,
system_message_handler_obj_.obj());
}
           

      這個函數定義在檔案/external/chromium_org/base/message_loop/message_pump_android.cc中。

      前面提到,MessagePumpForUI類的成員變量system_message_handler_obj_描述的是一個Java層的SystemMessageHandler對象,這裡通過函數Java_SystemMessageHandler_scheduleWork調用它的成員函數scheduleWork。

      Java層的SystemMessageHandler類的成員函數scheduleWork的實作如下所示:

class SystemMessageHandler extends Handler {
......
@SuppressWarnings("unused")
@CalledByNative
private void scheduleWork() {
sendEmptyMessage(SCHEDULED_WORK);
}
......
}
           

       這個函數定義在檔案external/chromium_org/base/android/java/src/org/chromium/base/SystemMessageHandler.java中。

       SystemMessageHandler類的成員函數scheduleWork調用從父類Handler繼承下來的成員函數sendEmptyMessage向Java層的消息隊列發送一個類型為SCHEDULED_WORK的消息,該消息最終地在SystemMessageHandler類的成員函數handleMessage中得到處理,如下所示:

class SystemMessageHandler extends Handler {
......
// Native class pointer set by the constructor of the SharedClient native class.
private long mMessagePumpDelegateNative = 0;
......
private SystemMessageHandler(long messagePumpDelegateNative) {
mMessagePumpDelegateNative = messagePumpDelegateNative;
}
@Override
public void handleMessage(Message msg) {
......
nativeDoRunLoopOnce(mMessagePumpDelegateNative, mDelayedScheduledTimeTicks);
}
......
}
           

       這個函數定義在檔案external/chromium_org/base/android/java/src/org/chromium/base/SystemMessageHandler.java中。

       SystemMessageHandler類的成員變量mMessagePumpDelegateNative是在構造函數中初始化的,從前面分析的MessagePumpForUI的成員函數Start可以知道,它指向的是一個Native層的MessageLoopForUI對象。

       SystemMessageHandler類的成員函數handleMessage調用JNI函數nativeDoRunLoopOnce通知成員變量mMessagePumpDelegateNative描述的一個Native層的MessageLoopForUI對象,它的消息隊列有新的消息需要處理。

       SystemMessageHandler類的JNI函數nativeDoRunLoopOnce由Naitve層的函數Java_com_android_org_chromium_base_SystemMessageHandler_nativeDoRunLoopOnce實作,如下所示:

static void DoRunLoopOnce(JNIEnv* env, jobject jcaller,
jlong messagePumpDelegateNative,
jlong delayedScheduledTimeTicks);
__attribute__((visibility("default")))
void
Java_com_android_org_chromium_base_SystemMessageHandler_nativeDoRunLoopOnce(JNIEnv*
env, jobject jcaller,
jlong messagePumpDelegateNative,
jlong delayedScheduledTimeTicks) {
return DoRunLoopOnce(env, jcaller, messagePumpDelegateNative,
delayedScheduledTimeTicks);
}
           

       這個函數定義在編譯時自動生成的檔案out/target/product/generic/obj/GYP/shared_intermediates/base/jni/SystemMessageHandler_jni.h中。

       函數Java_com_android_org_chromium_base_SystemMessageHandler_nativeDoRunLoopOnce調用了另外一個函數DoRunLoopOnce通知UI線程在Native的消息隊列有新的消息需要處理。

       函數DoRunLoopOnce的實作如下所示:

static void DoRunLoopOnce(JNIEnv* env, jobject obj, jlong native_delegate,
jlong delayed_scheduled_time_ticks) {
base::MessagePump::Delegate* delegate =
reinterpret_cast<base::MessagePump::Delegate*>(native_delegate);
......
bool did_work = delegate->DoWork();
......
base::TimeTicks next_delayed_work_time;
did_work |= delegate->DoDelayedWork(&next_delayed_work_time);
if (!next_delayed_work_time.is_null()) {
......
if (delayed_scheduled_time_ticks == 0 ||
next_delayed_work_time < base::TimeTicks::FromInternalValue(
delayed_scheduled_time_ticks)) {
Java_SystemMessageHandler_scheduleDelayedWork(env, obj,
next_delayed_work_time.ToInternalValue(),
(next_delayed_work_time -
base::TimeTicks::Now()).InMillisecondsRoundedUp());
}
}
......
if (did_work)
return;
delegate->DoIdleWork();
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_pump_android.cc中。

       函數DoRunLoopOnce首先是将參數native_delegate轉化為一個Native層的base::MessagePump::Delegate對象。在我們這個情景中,這個base::MessagePump::Delegate對象實際上是一個MessageLoopForUI對象。

       接下來函數DoRunLoopOnce執行的工作與前面分析的MessagePumpDefault類的成員函數Run是類似的,差別主要在于:

       1. 函數DoRunLoopOnce隻執行一次循環,而MessagePumpDefault類的成員函數Run執行的是無限循環。

       2. 函數DoRunLoopOnce不能夠進入睡眠狀态,如果它有一個延遲處理的消息,那麼需要通過函數Java_SystemMessageHandler_scheduleDelayedWork調用Java層的SystemMessageHandler類的成員函數scheduleDelayedWork來進行排程,而MessagePumpDefault類的成員函數Run可以自行進入睡眠狀态來排程延遲處理消息的執行。

      至此,我們就分析完成Android應用程式的UI線程是如何實作Chromium的消息循環了。接下來我們繼續分析另外一個特殊的消息循環,那就是負責執行IPC的IO線程的消息循環。

      在前面分析的BrowserMainRunnerImpl類的成員函數Initialize中,除了我們提到它會調用BrowserMainLoop類的成員函數EarlyInitialization和MainMessageLoopStart來執行一些與UI線程相關的工作之外,還會調用BrowserMainLoop類的成員函數CreateStartupTasks來執行其它的啟動任務,如下所示:

class BrowserMainRunnerImpl : public BrowserMainRunner {
public:
......
virtual int Initialize(const MainFunctionParams& parameters) OVERRIDE {
......
if (!initialization_started_) {
......
}
main_loop_->CreateStartupTasks();
int result_code = main_loop_->GetResultCode();
if (result_code > 0)
return result_code;
// Return -1 to indicate no early termination.
return -1;
}
......
};
           

      這個函數定義在檔案external/chromium_org/content/browser/browser_main_runner.cc中。

      BrowserMainLoop類的成員函數CreateStartupTasks執行的啟動任務很多,這裡我們隻關心與IO線程相關的任務,如下所示:

void BrowserMainLoop::CreateStartupTasks() {
......
if (!startup_task_runner_.get()) {
startup_task_runner_ = make_scoped_ptr(new StartupTaskRunner(
base::Bind(&BrowserStartupComplete),
base::MessageLoop::current()->message_loop_proxy()));
......
StartupTask create_threads =
base::Bind(&BrowserMainLoop::CreateThreads, base::Unretained(this));
startup_task_runner_->AddTask(create_threads);
......
if (BrowserMayStartAsynchronously()) {
startup_task_runner_->StartRunningTasksAsync();
}
}
if (!BrowserMayStartAsynchronously()) {
......
startup_task_runner_->RunAllTasksNow();
}
}
           

       這個函數定義在檔案external/chromium_org/content/browser/browser_main_loop.cc中。

       BrowserMainLoop類的成員函數CreateStartupTasks首先是會建立一個StartupTaskRunner對象,并且儲存在成員變量startup_task_runner_中。這個StartupTaskRunner對象封裝了目前線程的一個消息循環,是以通過它可以向目前線程的消息隊列發送消息。目前線程即為Android應用程式的UI線程,是以有了這個StartupTaskRunner對象之後,接下來可以向UI線程的消息隊列發送消息。

       BrowserMainLoop類的成員函數CreateStartupTasks建立了一個用來建立線程的StartupTask,這個StartupTask綁定的函數為BrowserMainLoop類的成員函數CreateThreads,并且會儲存在前面建立的一個StartupTaskRunner的内部等待執行。

       最後,取決于Android應用程式的UI線程是使用同步還是異步方式來啟動WebView,BrowserMainLoop類的成員函數CreateStartupTasks使用不同的方式來執行儲存在成員變量startup_task_runner_指向的一個StartupTaskRunner對象中的StartupTask:

       1. 如果是使用同步方式啟動WebView,那麼就調用上述StartupTaskRunner對象的成員函數RunAllTasksNow來執行儲存在它裡面的各個StartupTask對象的成員函數Run來執行它們。

       2. 如果是使用異步方式啟動WebView,那麼就調用上述StartupTaskRunner對象的成員函數StartRunningTasksAsync向UI線程的消息隊列發送一個消息,當該消息被處理時,再執行儲存在上述StartupTaskRunner對象裡面的各個StartupTask對象的成員函數Run。

       無論是同步方式,還是異步方式,最終都會在UI線程調用BrowserMainLoop類的成員函數CreateThreads來建立一系列線程,如下所示:

int BrowserMainLoop::CreateThreads() {
......
base::Thread::Options default_options;
base::Thread::Options io_message_loop_options;
io_message_loop_options.message_loop_type = base::MessageLoop::TYPE_IO;
......
for (size_t thread_id = BrowserThread::UI + 1;
thread_id < BrowserThread::ID_COUNT;
++thread_id) {
scoped_ptr<BrowserProcessSubThread>* thread_to_start = NULL;
base::Thread::Options* options = &default_options;
switch (thread_id) {
......
case BrowserThread::IO:
......
thread_to_start = &io_thread_;
options = &io_message_loop_options;
break;
......
default:
NOTREACHED();
break;
}
BrowserThread::ID id = static_cast<BrowserThread::ID>(thread_id);
if (thread_to_start) {
(*thread_to_start).reset(new BrowserProcessSubThread(id));
(*thread_to_start)->StartWithOptions(*options);
}
......
}
......
return result_code_;
}
           

      這個函數定義在檔案external/chromium_org/content/browser/browser_main_loop.cc中。

      BrowserMainLoop類的成員函數CreateThreads建立了很多線程,每一個線程都有專門的作用。這些線程的作用可以參考以下的枚舉類型ID的定義:

class CONTENT_EXPORT BrowserThread {
public:
// An enumeration of the well-known threads.
// NOTE: threads must be listed in the order of their life-time, with each
// thread outliving every other thread below it.
enum ID {
// The main thread in the browser.
UI,
// This is the thread that interacts with the database.
DB,
// This is the thread that interacts with the file system.
FILE,
// Used for file system operations that block user interactions.
// Responsiveness of this thread affect users.
FILE_USER_BLOCKING,
// Used to launch and terminate Chrome processes.
PROCESS_LAUNCHER,
// This is the thread to handle slow HTTP cache operations.
CACHE,
// NOTE: do not add new threads here that are only used by a small number of
// files. Instead you should just use a Thread class and pass its
// MessageLoopProxy around. Named threads there are only for threads that
// are used in many places.
// This identifier does not represent a thread. Instead it counts the
// number of well-known threads. Insert new well-known threads before this
// identifier.
ID_COUNT
};
......
};
           

       這個枚舉類型定義在檔案external/chromium_org/content/public/browser/browser_thread.h中。

       回到BrowserMainLoop類的成員函數CreateThreads中,我們隻關注IO線程的建立過程。這個IO線程使用一個BrowserProcessSubThread對象來描述,并且通過調用該BrowserProcessSubThread對象的成員函數StartWithOptions來啟動。

       BrowserProcessSubThread類是從BrowserThreadImpl類繼承下來的。從前面的分析又可以知道,BrowserThreadImpl類又是從base::Thread類繼承下來的。是以,Android應用程式中用來負責執行IPC的IO線程實際上是通過Thread類的成員函數StartWithOptions來建立,并且在建立的時候,指定建立的消息循環的類型為base::MessageLoop::TYPE_IO。

       從前面分析的MessageLoop類的成員函數CreateMessagePumpForType的實作可以知道,類型為base::MessageLoop::TYPE_IO的消息循環使用的消息泵的通過類MessagePumpLibevent來描述。

       MessagePumpLibevent類是從MessagePump類繼承下來的,它的定義如下所示:

class BASE_EXPORT MessagePumpLibevent : public MessagePump {
public:
......
MessagePumpLibevent();
......
// MessagePump methods:
virtual void Run(Delegate* delegate) OVERRIDE;
virtual void Quit() OVERRIDE;
virtual void ScheduleWork() OVERRIDE;
virtual void ScheduleDelayedWork(const TimeTicks& delayed_work_time) OVERRIDE;
private:
......
// Libevent dispatcher. Watches all sockets registered with it, and sends
// readiness callbacks when a socket is ready for I/O.
event_base* event_base_;
// ... write end; ScheduleWork() writes a single byte to it
int wakeup_pipe_in_;
// ... read end; OnWakeup reads it and then breaks Run() out of its sleep
int wakeup_pipe_out_;
// ... libevent wrapper for read end
event* wakeup_event_;
......
};
           

       這個類定義在檔案external/chromium_org/base/message_loop/message_pump_libevent.h中。

       MessagePumpLibevent類與前面分析的MessagePumpDefault類實作消息循環的最大差別是,前者通過Libevent實作線程睡眠與喚醒,而後者是通過條件變量實作線程睡眠與喚醒的。

       Libevent在Android平台上實際上就是封裝了由Linux核心提供的epoll機制。如果讀過 Android應用程式消息處理機制(Looper、Handler)分析這篇文章,Android應用程式使用的的消息循環機是基于epoll機制實作的。是以,Chromium裡面的IO線程的消息循環機制與Android應用程式的消息循環機制的實作是很相似的。接下來我們就簡單分析Chromium裡面的IO線程的消息循環的實作,主要就是分析MessagePumpLibevent類的構造函數、成員函數Run和ScheduleWork的實作。

       MessagePumpLibevent類的構造函數的實作如下所示:

MessagePumpLibevent::MessagePumpLibevent()
: .....
event_base_(event_base_new()),
wakeup_pipe_in_(-1),
wakeup_pipe_out_(-1) {
if (!Init())
NOTREACHED();
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_pump_libevent.cc中。

       MessagePumpLibevent類的構造函數主要就是調用成員函數Init初始化Libevent,如下所示:

bool MessagePumpLibevent::Init() {
int fds[2];
if (pipe(fds)) {
......
return false;
}
......
wakeup_pipe_out_ = fds[0];
wakeup_pipe_in_ = fds[1];
wakeup_event_ = new event;
event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
OnWakeup, this);
event_base_set(event_base_, wakeup_event_);
if (event_add(wakeup_event_, 0))
return false;
return true;
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_pump_libevent.cc中。

       MessagePumpLibevent類的成員函數Init首先是創建立一個管道,并且分别将該管道的讀端和寫端檔案描述符儲存在成員變量wakeup_pipe_out_和wakeup_pipe_in_中。

       接下來,MessagePumpLibevent類的成員函數Init建立了一個Libevent裡面的event,儲存在成員變量wakeup_event_中,并且通過Libevent提供的函數event_set指定該event是用來監控檔案描述wakeup_pipe_out_的EV_READ和EV_PERSIST事件的,同時指定上述事件發生時,就調用MessagePumpLibevent類的靜态成員函數OnWakeup。這相當于是建立了一個epoll裡面的epoll_event。

       再接下來,MessagePumpLibevent類的成員函數Init通過Libevent提供的函數event_base_set建立了一個event_base。這相當于是通過epoll提供的函數epoll_create建立了一個epoll檔案描述符。

       最後,MessagePumpLibevent類的成員函數Init通過調用Libevent提供函數event_add将前面建立的event加入到前面建立的event_base裡面去,以便可以對指定的IO事件進行監控。這相當于是調用了epoll提供的函數epoll_ctl。

       接下來,我們繼續分析MessagePumpLibevent類的成員函數Run的實作,如下所示:

void MessagePumpLibevent::Run(Delegate* delegate) {
......
scoped_ptr<event> timer_event(new event);
for (;;) {
......
bool did_work = delegate->DoWork();
if (!keep_running_)
break;
event_base_loop(event_base_, EVLOOP_NONBLOCK);
did_work |= processed_io_events_;
processed_io_events_ = false;
if (!keep_running_)
break;
did_work |= delegate->DoDelayedWork(&delayed_work_time_);
if (!keep_running_)
break;
if (did_work)
continue;
did_work = delegate->DoIdleWork();
if (!keep_running_)
break;
if (did_work)
continue;
// EVLOOP_ONCE tells libevent to only block once,
// but to service all pending events when it wakes up.
if (delayed_work_time_.is_null()) {
event_base_loop(event_base_, EVLOOP_ONCE);
} else {
TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
if (delay > TimeDelta()) {
struct timeval poll_tv;
poll_tv.tv_sec = delay.InSeconds();
poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
event_set(timer_event.get(), -1, 0, timer_callback, event_base_);
event_base_set(event_base_, timer_event.get());
event_add(timer_event.get(), &poll_tv);
event_base_loop(event_base_, EVLOOP_ONCE);
event_del(timer_event.get());
} else {
// It looks like delayed_work_time_ indicates a time in the past, so we
// need to call DoDelayedWork now.
delayed_work_time_ = TimeTicks();
}
}
}
}
           

      這個函數定義在檔案external/chromium_org/base/message_loop/message_pump_libevent.cc中。

      與MessagePumpDefault類的成員函數Run相比,MessagePumpLibevent類的成員函數Run的執行流程是相似的,主要差別在于:

      1. MessagePumpLibevent類通過調用Libevent提供的函數event_base_loop使得線程進入睡眠狀态,而MessagePumpDefault類通過條件變量使得程進入睡眠狀态。

      2. MessagePumpLibevent除了用來監控消息隊列有無新增消息之後,還用來監控指定的檔案描述符的IO事件,如以下代碼所示:

event_base_loop(event_base_, EVLOOP_NONBLOCK);
did_work |= processed_io_events_;
processed_io_events_ = false;
if (!keep_running_)
break;
           

       這段代碼調用Libevent提供的函數event_base_loop檢查其它指定監控的檔案描述是有IO事件發生。如果有發生的話,就調用它們指定的回調函數進行處理。注意,這裡調用函數event_base_loop時,第二個參數指定為EVLOOP_NONBLOCK,表示在沒有IO事件發生的情況下,不會阻塞目前線程的執行。

       在調用函數event_base_loop時,如果第二個參數指定為EVLOOP_ONCE,則表示在沒有IO事件發生的情況下,會阻塞目前線程的執行,直到有IO事件發生,或者指定的阻塞時間逾時為止。這相當于是調用了epoll提供的函數epoll_wait。

        接下來我們繼續分析MessagePumpLibevent類的成員函數ScheduleWork的實作,如下所示:

void MessagePumpLibevent::ScheduleWork() {
// Tell libevent (in a threadsafe way) that it should break out of its loop.
char buf = 0;
int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
DCHECK(nwrite == 1 || errno == EAGAIN)
<< "[nwrite:" << nwrite << "] [errno:" << errno << "]";
}
           

      這個函數定義在檔案external/chromium_org/base/message_loop/message_pump_libevent.cc中。

      MessagePumpLibevent類的成員函數ScheduleWork向成員變量wakeup_pipe_in_描述的管道寫入一個字元,從前面分析的MessagePumpLibevent類的成員函數Init可以知道,這将會導緻MessagePumpLibevent類的靜态成員函數OnWakeup被調用,如下所示:

void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
MessagePumpLibevent* that = static_cast<MessagePumpLibevent*>(context);
DCHECK(that->wakeup_pipe_out_ == socket);
// Remove and discard the wakeup byte.
char buf;
int nread = HANDLE_EINTR(read(socket, &buf, 1));
DCHECK_EQ(nread, 1);
that->processed_io_events_ = true;
// Tell libevent to break out of inner loop.
event_base_loopbreak(that->event_base_);
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_pump_libevent.cc中。

       MessagePumpLibevent類的靜态成員函數OnWakeup首先是調用函數read将前面寫入到管道的字元讀取出來,并且調用Libevent提供的函數event_base_loopbreak使得MessagePumpLibevent類的成員函數Run可以從函數event_base_loop傳回,以便可以去處理消息隊列的消息。

       前面提到,MessagePumpLibevent除了用來監控消息隊列之外,還可以用來監控指定檔案描述符的IO事件。這是通過調用MessagePumpLibevent類的成員函數WatchFileDescriptor實作的,如下所示:

bool MessagePumpLibevent::WatchFileDescriptor(int fd,
bool persistent,
int mode,
FileDescriptorWatcher *controller,
Watcher *delegate) {
......
int event_mask = persistent ? EV_PERSIST : 0;
if (mode & WATCH_READ) {
event_mask |= EV_READ;
}
if (mode & WATCH_WRITE) {
event_mask |= EV_WRITE;
}
scoped_ptr<event> evt(controller->ReleaseEvent());
if (evt.get() == NULL) {
// Ownership is transferred to the controller.
evt.reset(new event);
} else {
// Make sure we don't pick up any funky internal libevent masks.
int old_interest_mask = evt.get()->ev_events &
(EV_READ | EV_WRITE | EV_PERSIST);
// Combine old/new event masks.
event_mask |= old_interest_mask;
// Must disarm the event before we can reuse it.
event_del(evt.get());
if (EVENT_FD(evt.get()) != fd) {
......
return false;
}
}
// Set current interest mask and message pump for this event.
event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller);
// Tell libevent which message pump this socket will belong to when we add it.
if (event_base_set(event_base_, evt.get())) {
return false;
}
// Add this socket to the list of monitored sockets.
if (event_add(evt.get(), NULL)) {
return false;
}
// Transfer ownership of evt to controller.
controller->Init(evt.release());
controller->set_watcher(delegate);
controller->set_pump(this);
return true;
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_pump_libevent.cc中。

       各個參數的含義如下所示:

       1. fd:要監控其IO事件的檔案描述符。

       2. persistent:是否要持續監控參數mode描述的IO事件。

       3. mode:具體的IO事件,例如讀寫事件等。

       4. controller:指向一個負責接收IO事件通知的FileDescriptorWatcher對象。

       5. delegate:指向一個Watcher對象,參數controller将接收到的IO事件轉發給它處理。

       MessagePumpLibevent類的成員函數WatchFileDescriptor首先是根據參數persistent和mode初始化好一個event_mask,接下來調用event_set設定一個代表IO監控事件的event時要用到。

       MessagePumpLibevent類的成員函數WatchFileDescriptor接下來調用參數controller描述的一個FileDescriptorWatcher對象的成員函數ReleaseEvent檢查其内部是否提供了一個event。如果沒有提供,那麼建立一個新的event,以便用來監控檔案描述符fd的IO事件。如果有提供,則複用它。複用不僅僅是event對象本身,還包括該event對象原來設定的event_mask。同時,能夠複用有一個前提,就是被複用的event關聯的檔案描述符必須要與參數fd描述的檔案描述符一緻。

       MessagePumpLibevent類的成員函數WatchFileDescriptor接下來調用Libevent提供的函數event_set重新設定前面獲得的event的屬性,包括它要監控的檔案描述符、要監控的具體IO事件、以及監控的IO事件發生時的回調函數等。從這裡就可以看到,當檔案描述符fd指定的IO事件發生時,MessagePumpLibevent類的靜态成員函數OnLibeventNotification就會被調用,并且會獲得參數controller指向的一個FileDescriptorWatcher對象。

       MessagePumpLibevent類的成員函數WatchFileDescriptor接下來調用Libevent提供的函數event_base_set和event_add将前面已經設定好屬性的event增加到成員變量event_base_描述的一個事件監控對象中去。

       MessagePumpLibevent類的成員函數WatchFileDescriptor最後将用來描述IO事件監控的event、負責處理IO事件的Watcher和以及目前正在處理的一個MessagePumpLibevent對象設定到參數controller描述的一個FileDescriptorWatcher對象的内部去,以便該FileDescriptorWatcher對象在接收到IO事件通知時可以進行相應的處理。

       接下來我們再看被監控的檔案描述符發生指定的IO事件時的處理流程,即MessagePumpLibevent類的靜态成員函數OnLibeventNotification的實作,如下所示:

void MessagePumpLibevent::OnLibeventNotification(int fd, short flags,
void* context) {
WeakPtr<FileDescriptorWatcher> controller =
static_cast<FileDescriptorWatcher*>(context)->weak_factory_.GetWeakPtr();
DCHECK(controller.get());
MessagePumpLibevent* pump = controller->pump();
pump->processed_io_events_ = true;
if (flags & EV_WRITE) {
controller->OnFileCanWriteWithoutBlocking(fd, pump);
}
// Check |controller| in case it's been deleted in
// controller->OnFileCanWriteWithoutBlocking().
if (controller.get() && flags & EV_READ) {
controller->OnFileCanReadWithoutBlocking(fd, pump);
}
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_pump_libevent.cc中。

       從前面的分析可以知道,參數context指向的一個FileDescriptorWatcher對象,是以MessagePumpLibevent類的靜态成員函數OnLibeventNotification首先是将它強制轉化為一個FileDescriptorWatcher對象,并且獲得一個引用了它的WeakPtr弱智能指針。關于WeakPtr弱智能指針的實作和使用方式,可以參考前面 Chromium和WebKit的智能指針實作原理分析一文。

       這裡為什麼要獲得參數context指向的一個FileDescriptorWatcher對象的一個弱智能指針呢?這是因為後面調用它的成員函數OnFileCanWriteWithoutBlocking時,然後該成員函數OnFileCanWriteWithoutBlocking将IO事件分發給它内部的一個Watcher處理時,該FileDescriptorWatcher對象可能會被銷毀。為了不阻止該FileDescriptorWatcher對象銷毀,于是就使用WeakPtr弱智能指針引用它了。

       MessagePumpLibevent類的靜态成員函數OnLibeventNotification接下來調用前面獲得的FileDescriptorWatcher對象的成員函數pump獲得一個與它關聯的MessagePumpLibevent對象,并且将它的成員變量processed_io_events_的值設定為true,表示關聯的MessagePumpLibevent對象有新的IO事件需要處理。這個設定将會影響到前面分析的MessagePumpLibevent類的成員函數Run的運作,因為在這種情況下,MessagePumpLibevent類的成員函數Run不能夠去處理延遲消息、也不能執行Idle Work以及進入睡眠等待狀态,而是要馬上重新執行一次for循環,以及檢查有沒有更多的需要馬上處理IO事件需要處理。

       MessagePumpLibevent類的靜态成員函數OnLibeventNotification最後就通過參數flags檢查具體發生的IO事件,并且執行相應的處理:

       1. 如果發生的是寫事件,那麼就調用參數context指向的一個FileDescriptorWatcher對象的成員函數OnFileCanWriteWithoutBlocking進行處理。

       2. 如果發生的是讀事件,那麼就調用參數context指向的一個FileDescriptorWatcher對象的成員函數OnFileCanReadWithoutBlocking進行處理。如上所述,前面在處理寫事件的過程中,有可能參數context指向的一個FileDescriptorWatcher對象已經被銷毀,是以,這裡要先調用一下WeakPtr弱智能指針controller的成員函數get判斷它是否真的已經被銷毀。如果已經被銷毀,那麼就不需要調用它的成員函數OnFileCanReadWithoutBlocking了。

       FileDescriptorWatcher類的成員函數OnFileCanWriteWithoutBlocking和OnFileCanReadWithoutBlocking的實作如下所示:

void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanReadWithoutBlocking(
int fd, MessagePumpLibevent* pump) {
// Since OnFileCanWriteWithoutBlocking() gets called first, it can stop
// watching the file descriptor.
if (!watcher_)
return;
pump->WillProcessIOEvent();
watcher_->OnFileCanReadWithoutBlocking(fd);
pump->DidProcessIOEvent();
}
void MessagePumpLibevent::FileDescriptorWatcher::OnFileCanWriteWithoutBlocking(
int fd, MessagePumpLibevent* pump) {
DCHECK(watcher_);
pump->WillProcessIOEvent();
watcher_->OnFileCanWriteWithoutBlocking(fd);
pump->DidProcessIOEvent();
}
           

       這兩個函數定義在檔案external/chromium_org/base/message_loop/message_pump_libevent.cc中。

       從這裡就可以看到,FileDescriptorWatcher類的成員函數OnFileCanWriteWithoutBlocking和OnFileCanReadWithoutBlocking隻是簡單地将接收到的IO事件通知轉發給成員變量watcher_描述的一個Watcher對象處理。在轉發前後,它們也會分别調用MessagePumpLibevent類的成員函數WillProcessIOEvent和DidProcessIOEvent通知關聯的MessagePumpLibevent對象将有IO事件被處理以及IO事件已處理完畢。

       這裡需要注意的一點是,如果一個檔案描述符同時發生了讀事件和寫事件,那麼如前所述,先處理寫事件,再處理讀事件。這樣就有可能在處理寫事件的時候,關聯的FileDescriptorWatcher對象的成員變量watcher_指向的Watcher對象被銷毀了,是以在處理讀事件的時候,需要先判斷成員變量watcher_的值是否為NULL。如果為NULL,那麼就意味着它之前指向的Watcher對象被銷毀了,于是就不用往下處理了。

       以上就是通過MessagePumpLibevent類實作消息循環的原理,它與Android應用程式使用的消息循環的實作原理是一樣的,是以這裡我們并沒有很深入地對它進行分析,例如沒有深入到Libevent内部去分析,有興趣的同學可以參考前面 Android應用程式消息處理機制(Looper、Handler)分析一文。

       IO線程的消息循環之是以要通過MessagePumpLibevent類來實作消息循環,是因為它的消息循環主要是用來監控一個負責執行IPC的UNIX Socket的,也就是說,Chromium的IPC是通過UNIX Socket進行的。這樣當一個程序向另外一個程序發送消息時,就會觸發使用的UNIX Socket發生IO事件,然後就會被IO線程的消息循環監控到,最後就可以得到處理。

       至此,關于Chromium的線程消息循環我們就分析完畢,但是關于消息發送,還有一些特性值得進一步分析,主要是關于消息的發送接口的。前面我們分析消息發送接口都是通過MessageLoop提供的。也就是說,在往一個線程的消息隊列發送消息之前,我們首先要獲得這個線程的消息循環,這是通過調用Thread類的成員函數message_loop獲得的,如下所示:

class BASE_EXPORT Thread : PlatformThread::Delegate {
public:
......
MessageLoop* message_loop() const { return message_loop_; }
......
};
           

       這個函數定義在檔案external/chromium_org/base/threading/thread.h中。

       通過調用Thread類的成員函數message_loop直接擷取線程的關聯的MessageLoop對象會有一個問題,我們以後通過該MessageLoop對象發送消息時,不能保證該MessageLoop對象是有效的,因為線程有可能退出了,這會導緻其關聯的MessageLoop對象被銷毀了。

       是以,我們需要有一種機制,即使是線程退出了,我們也可以繼續持有一個消息發送接口。該消息發送接口能夠保證,如果線程還沒有退出,那麼就能正常地向它發送消息。另一方面,如果線程已經退出,那麼最多就是執行一空操作,但是不會造成非法記憶體通路。

       學習過 Chromium和WebKit的智能指針實作原理分析這篇文章之後,我們很容易想到,可以通過scoped_refptr智能指針來實作這種機制。Thread類提供了一個成員函數message_loop_proxy,可以獲得線程的一個消息發送代理接口,即一個MessageLoopProxy接口,如下所示:

class BASE_EXPORT Thread : PlatformThread::Delegate {
public:
......
scoped_refptr<MessageLoopProxy> message_loop_proxy() const {
return message_loop_ ? message_loop_->message_loop_proxy() : NULL;
}
......
};
           

      這個函數定義在檔案external/chromium_org/base/threading/thread.h中。

      這個MessageLoopProxy接口通過scoped_refptr智能指針引用,是以就能保證它還在使用的時候,不會被銷毀,即使線程已經退出,這樣就能夠避免非法記憶體通路。

      接下來我們就繼續分析MessageLoopProxy接口是如何實作的。從Thread類的成員函數message_loop_proxy可以知道,它傳回給調用者的MessageLoopProxy接口是通過成員變量message_loop_指向的一個MessageLoop對象的成員函數message_loop_proxy獲得的。 

      MessageLoop類的成員函數message_loop_proxy的實作如下所示:

class BASE_EXPORT MessageLoop : public MessagePump::Delegate {
public:
......
scoped_refptr<MessageLoopProxy> message_loop_proxy() {
return message_loop_proxy_;
}
......
private:
......
scoped_refptr<internal::MessageLoopProxyImpl> message_loop_proxy_;
......
};
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_loop.h中。

       MessageLoop類的成員函數message_loop_proxy傳回的是成員變量message_loop_proxy_指向的一個MessageLoopProxyImpl對象。從前面的分析可以知道,這個成員變量是在MessageLoop類的成員函數Init中初始化的,如下所示:

void MessageLoop::Init() {
......
incoming_task_queue_ = new internal::IncomingTaskQueue(this);
message_loop_proxy_ =
new internal::MessageLoopProxyImpl(incoming_task_queue_);
......
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_loop.cc中。

       MessageLoop類的成員函數Init首先建立了一個用來描述線程消息隊列的一個IncomingTaskQueue,然後再根據這個IncomingTaskQueue建立了一個MessageLoopProxyImpl對象,并且儲存在成員變量message_loop_proxy_中。

       MessageLoopProxyImpl對象的建立過程如下所示:

MessageLoopProxyImpl::MessageLoopProxyImpl(
scoped_refptr<IncomingTaskQueue> incoming_queue)
: incoming_queue_(incoming_queue),
valid_thread_id_(PlatformThread::CurrentId()) {
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_loop_proxy_impl.cc中。

       MessageLoopProxyImpl類的構造函數主要就是将參數incoming_queue指向的一個IncomingTaskQueue對象儲存在成員變量incoming_queue_中。注意,MessageLoopProxyImpl類的成員變量incoming_queue_是一個scoped_refptr智能指針,是以即使它所屬的線程退出了,它所引用的IncomingTaskQueue對象仍然是存在的。

       MessageLoopProxyImpl類的繼承關系如圖4所示:

Chromium多線程模型設計和實作分析

圖4 MessageLoopProxyImpl類繼承圖

       從圖4可以看到,MessageLoopProxyImpl類從TaskRunner類一路繼承下來。TaskRunner類定義了PostTask和PostDelayedTask兩個接口。此外,SequencedTaskRunner類又定義了PostNonNestableTask和PostNonNestableDelayedTask兩個接口。MessageLoopProxyImpl類本身重寫了父類TaskRunner的PostDelayedTask接口以及SequencedTaskRunner類的PostNonNestableDelayedTask接口。這樣就使得MessageLoopProxyImpl類像MessageLoop類一樣,具有PostTask、PostDelayedTask、PostNonNestableTask和PostNonNestableDelayedTask四個消息發送接口。

       TaskRunner類的成員函數PostTask的實作如下所示:

bool TaskRunner::PostTask(const tracked_objects::Location& from_here,
const Closure& task) {
return PostDelayedTask(from_here, task, base::TimeDelta());
}
           

      這個函數定義在檔案external/chromium_org/base/task_runner.cc中。

      從這裡就可以看到,TaskRunner類的成員函數PostTask最終通過調用由子類MessageLoopProxyImpl重寫的接口PostDelayedTask來向線程的消息隊列發送消息。

      SequencedTaskRunner類的成員函數PostNonNestableTask的實作如下所示:

bool SequencedTaskRunner::PostNonNestableTask(
const tracked_objects::Location& from_here,
const Closure& task) {
return PostNonNestableDelayedTask(from_here, task, base::TimeDelta());
}
           

       這個函數定義在檔案external/chromium_org/base/sequenced_task_runner.cc中。

       從這裡也可以看到,SequencedTaskRunner類的成員函數PostNonNestableTask最終通過調用由子類MessageLoopProxyImpl重寫的接口PostNonNestableDelayedTask來向線程的消息隊列發送消息。

       是以,無論我們調用MessageLoopProxyImpl類的哪一個消息發送接口,最終都歸結為調用PostDelayedTask和PostNonNestableDelayedTask這兩個接口,它們的實作如下所示:

bool MessageLoopProxyImpl::PostDelayedTask(
const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta delay) {
DCHECK(!task.is_null()) << from_here.ToString();
return incoming_queue_->AddToIncomingQueue(from_here, task, delay, true);
}
bool MessageLoopProxyImpl::PostNonNestableDelayedTask(
const tracked_objects::Location& from_here,
const base::Closure& task,
base::TimeDelta delay) {
DCHECK(!task.is_null()) << from_here.ToString();
return incoming_queue_->AddToIncomingQueue(from_here, task, delay, false);
}
           

       這兩個函數定義在檔案external/chromium_org/base/message_loop/message_loop_proxy_impl.cc中。

       MessageLoopProxyImpl類的成員函數PostDelayedTask和PostNonNestableDelayedTask都是通過調用成員變量imcoming_queue_指向的一個IncomingTaskQueue對象的成員函數AddToIncomingQueue向線程的消息隊列發送消息。

       從前面的分析可以知道,IncomingTaskQueue類的成員函數AddToIncomingQueue最終調用了另外一個成員函數PostPendingTask向線程的消息隊列發送消息,如下所示:

bool IncomingTaskQueue::PostPendingTask(PendingTask* pending_task) {
......
// This should only be called while the lock is taken.
incoming_queue_lock_.AssertAcquired();
if (!message_loop_) {
pending_task->task.Reset();
return false;
}
......
bool was_empty = incoming_queue_.empty();
incoming_queue_.push(*pending_task);
pending_task->task.Reset();
// Wake up the pump.
message_loop_->ScheduleWork(was_empty);
return true;
}
           

      這個函數定義在檔案external/chromium_org/base/message_loop/incoming_task_queue.cc中。

      我們注意到,IncomingTaskQueue類的成員函數PostPendingTask在将消息添加到線程的消息隊列之前,首先會判斷線程的消息循環是否還存在,即判斷成員變量message_loop_的值是否等于NULL。如果等于NULL,那麼就說明線程已經退出了,這時候就什麼也不做就傳回了。

      IncomingTaskQueue類的成員變量message_loop_是在構造函數中初始化的,如下所示:

IncomingTaskQueue::IncomingTaskQueue(MessageLoop* message_loop)
: message_loop_(message_loop),
...... {
}
           

      這個函數定義在檔案external/chromium_org/base/message_loop/incoming_task_queue.cc中。

      現在的重點問題就是,IncomingTaskQueue類的成員變量message_loop_是什麼時候被設定為NULL的呢?也就是它是怎麼知道線程退出的呢?

      當線程退出時,MessageLoop對象會被銷毀,這時候它的析構函數會被調用,如下所示:

MessageLoop::~MessageLoop() {
......
// Tell the incoming queue that we are dying.
incoming_task_queue_->WillDestroyCurrentMessageLoop();
......
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/message_loop.cc中。

       MessageLoop類的析構函數會調用成員變量incoming_task_queue_指向的一個IncomingTaskQueue對象的成員函數WillDestroyCurrentMessageLoop通知它線程的消息循環要被銷毀了。

       IncomingTaskQueue類的成員函數WillDestroyCurrentMessageLoop的實作如下所示:

void IncomingTaskQueue::WillDestroyCurrentMessageLoop() {
......
AutoLock lock(incoming_queue_lock_);
message_loop_ = NULL;
}
           

       這個函數定義在檔案external/chromium_org/base/message_loop/incoming_task_queue.cc中。

       這時候IncomingTaskQueue類的成員變量message_loop_就會被設定為NULL,這樣以後再調用IncomingTaskQueue類的成員函數AddToIncomingQueue就再無法向線程的消息隊列發送消息了。

       從圖4還可以看到,我們除了可以使用MessageLoopProxy接口向線程的消息隊列發送消息之外,還可以通過SingleThreadTaskRunner、SequencedTaskRunner和TaskRunner接口向線程的消息隊列發送消息,這一類接口統稱為TaskRunner接口。

       TaskRunner是一個用來執行異步任務接口,我們通過它的成員函數PostTask和PostDelayedTask等可以将一個Closure發送給一個線程或者一個線程池執行。由于TaskRunner可能會将不同的Closure交給不同的線程執行,是以不能保證交給它的Closure的執行順序。TaskRunner唯一能保證的是它不會同步執行交給它的Closure,也就是不會直接調用Closure的成員函數Run。

       SequencedTaskRunner是從TaskRunner繼承下來的,但是它比TaskRunner多出一個額外的保證,就是交給它的Closure是按照一定順序執行的,不會出現兩個Closure同時執行的情況。例如,給出兩個Closure1和Closure2,如果滿足以下三個條件,則能保證Closure2在Closure1之後執行:

       1. Closure1比Closure2先Post給SequencedTaskRunner;

       2. Closure1指定的執行時間點小于等于Closure2指定的執行時間點;

       3. Closure1可嵌套消息循環中執行或者Closure2不可嵌套消息循環中執行。

       保證Closure2在Closure1之後執行,是說Closure1執行完成之後,才執行Closure2,而不隻是說Closure1的開始執行時間點比Closure2的開始執行時間點早。

       SingleThreadTaskRunner是從SequencedTaskRunner繼承下來的,但是它比SequencedTaskRunner又多出一個額外的保證,就是交給它的Closure都是由同一個線程執行的,不會出現一個Closure是在一個線程執行,另一個Closure是在另外一個線程執行的情況。

       我們發現一個帶消息循環的線程完全能夠滿足SingleThreadTaskRunner接口的要求,那麼我們如何獲得一個線程的SingleThreadTaskRunner接口呢?   

       MessageLoop類有一個成員變量thread_task_runner_handle_,如下所示:

class BASE_EXPORT MessageLoop : public MessagePump::Delegate {
......
private:
......
scoped_ptr<ThreadTaskRunnerHandle> thread_task_runner_handle_;
......
};
           

      這個成員變量定義在檔案external/chromium_org/base/message_loop/message_loop.h中。

      MessageLoop類的成員變量thread_task_runner_handle_是在前面我們分析過的MessageLoop類的成員函數Init初始化的,如下所示:

void MessageLoop::Init() {
......
incoming_task_queue_ = new internal::IncomingTaskQueue(this);
message_loop_proxy_ =
new internal::MessageLoopProxyImpl(incoming_task_queue_);
thread_task_runner_handle_.reset(
new ThreadTaskRunnerHandle(message_loop_proxy_));
}
           

      這個成員變量定義在檔案external/chromium_org/base/message_loop/message_loop.cc中。

      MessageLoop類的成員函數Init根據前面建立出來的MessageLoopProxyImpl對象建立了一個ThreadTaskRunnerHandle對象。

      一個ThreadTaskRunnerHandle對象的建立過程如下所示:

base::LazyInstance<base::ThreadLocalPointer<ThreadTaskRunnerHandle> >
lazy_tls_ptr = LAZY_INSTANCE_INITIALIZER;
......
ThreadTaskRunnerHandle::ThreadTaskRunnerHandle(
const scoped_refptr<SingleThreadTaskRunner>& task_runner)
: task_runner_(task_runner) {
......
lazy_tls_ptr.Pointer()->Set(this);
}
           

       這個函數定義在檔案external/chromium_org/base/thread_task_runner_handle.cc中。

       從圖4可以知道,MessageLoopProxyImpl類是從SingleThreadTaskRunner類繼承下來的,是以ThreadTaskRunnerHandle類的構造函數可以接受一個MessageLoopProxyImpl對象作為參數。

       ThreadTaskRunnerHandle類的構造函數做了兩件事情。第一件事情就是将參數task_runner描述的一個SingleThreadTaskRunner對象儲存在成員變量task_runner_中。第二件事情就是将正在建立的ThreadTaskRunnerHandle對象儲存線程局部存儲變量lazy_tls_ptr中。

       ThreadTaskRunnerHandle類還提供了一個靜态成員函數Get,用來獲得儲存線上程局部存儲變量lazy_tls_ptr的一個ThreadTaskRunnerHandle對象的成員變量task_runner_描述的一個SingleThreadTaskRunner對象,如下所示:

scoped_refptr<SingleThreadTaskRunner> ThreadTaskRunnerHandle::Get() {
ThreadTaskRunnerHandle* current = lazy_tls_ptr.Pointer()->Get();
......
return current->task_runner_;
}
           

      這個函數定義在檔案external/chromium_org/base/thread_task_runner_handle.cc中。

      這樣我們就可以獲得一個帶消息循環的線程的SingleThreadTaskRunner接口了,這個接口指向的實際上是一個MessageLoopProxyImpl對象,是以最終實際上是通過前面分析的MessageLoopProxyImpl接口來往線程發送消息。

      從圖4還可以知道,TaskRunner接口提供了一個成員函數PostTaskAndReply,如下所示:

class BASE_EXPORT TaskRunner
: public RefCountedThreadSafe<TaskRunner, TaskRunnerTraits> {
public:
......
bool PostTaskAndReply(const tracked_objects::Location& from_here,
const Closure& task,
const Closure& reply);
......
};
           

       這個函數聲明在檔案external/chromium_org/base/task_runner.h中。

       從TaskRunner類的成員函數PostTaskAndReply的聲明可以推斷出,它用來向一個目标線程請求異步執行一個任務task,并且當該任務執行完成時,向送出請求的線程發送一個reply。它實作的功能正好就是我們在前面圖1所描述的線程雙向異步通信機制。接下來我們就分析它是如何實作的。

       TaskRunner類的成員函數PostTaskAndReply的實作如下所示:

bool TaskRunner::PostTaskAndReply(
const tracked_objects::Location& from_here,
const Closure& task,
const Closure& reply) {
return PostTaskAndReplyTaskRunner(this).PostTaskAndReply(
from_here, task, reply);
}
           

      這個函數定義在檔案external/chromium_org/base/task_runner.cc中。

      TaskRunner類的成員函數PostTaskAndReply首先建立了一個PostTaskAndReplyTaskRunner對象,接着調用這個PostTaskAndReplyTaskRunner對象的成員函數 PostTaskAndReply來實作雙向異步通信機制。 

       PostTaskAndReplyTaskRunner對象的建立過程如下所示:

PostTaskAndReplyTaskRunner::PostTaskAndReplyTaskRunner(
TaskRunner* destination) : destination_(destination) {
DCHECK(destination_);
           

      這個函數定義在檔案external/chromium_org/base/task_runner.cc中。

      PostTaskAndReplyTaskRunner類的構造函數将參數destination描述的一個TaskRunner對象儲存在成員變量destination_中。

      PostTaskAndReplyTaskRunner類是從PostTaskAndReplyImpl類繼承下來的,并且它的成員函數PostTaskAndReply也是從PostTaskAndReplyImpl類繼承下來的,是以前面調用PostTaskAndReplyTaskRunner類的成員函數PostTaskAndReply實際上調用的是PostTaskAndReplyImpl類的成員函數PostTaskAndReply。

      PostTaskAndReplyImpl類的成員函數PostTaskAndReply的實作如下所示:

bool PostTaskAndReplyImpl::PostTaskAndReply(
const tracked_objects::Location& from_here,
const Closure& task,
const Closure& reply) {
PostTaskAndReplyRelay* relay =
new PostTaskAndReplyRelay(from_here, task, reply);
if (!PostTask(from_here, Bind(&PostTaskAndReplyRelay::Run,
Unretained(relay)))) {
delete relay;
return false;
}
return true;
}
           

       這個函數定義在檔案external/chromium_org/base/threading/post_task_and_reply_impl.cc中。

       PostTaskAndReplyImpl類的成員函數PostTaskAndReply先将參數from_here、task和reply封裝在一個PostTaskAndReplyRelay對象中,然後再将調用函數Bind建立一個Closure,并且通過調用由子類PostTaskAndReplyTaskRunner實作的成員函數PostTask執行該Closure,注意,這個Closure綁定的是PostTaskAndReplyRelay類的成員函數Run。

       PostTaskAndReplyRelay對象的建立過程如下所示:

class PostTaskAndReplyRelay {
public:
PostTaskAndReplyRelay(const tracked_objects::Location& from_here,
const Closure& task, const Closure& reply)
: from_here_(from_here),
origin_loop_(ThreadTaskRunnerHandle::Get()) {
task_ = task;
reply_ = reply;
}
......
private:
tracked_objects::Location from_here_;
scoped_refptr<SingleThreadTaskRunner> origin_loop_;
Closure reply_;
Closure task_;
};
           

       這個函數定義在檔案external/chromium_org/base/threading/post_task_and_reply_impl.cc中。

       PostTaskAndReplyRelay類的構造函數主要就是将參數from_here、task和reply描述的對象分别儲存在成員變量from_here_、task_和reply_中。另外,它還會通過我們前面分析過的ThreadTaskRunnerHandle類的靜态成員函數Get獲得一個SingleThreadTaskRunner對象,并且儲存在成員變量origin_loop_中。注意,這個SingleThreadTaskRunner對象是從目前線程獲得的,也就是調用了TaskRunner類的成員函數PostTaskAndReply的線程。

       PostTaskAndReplyTaskRunner類的成員函數PostTask的實作如下所示:

bool PostTaskAndReplyTaskRunner::PostTask(
const tracked_objects::Location& from_here,
const Closure& task) {
return destination_->PostTask(from_here, task);
}
           

      這個函數定義在檔案external/chromium_org/base/task_runner.cc中。

      從前面的分析可以知道,PostTaskAndReplyTaskRunner類的成員變量destination_指向的一個TaskRunner接口。假設這個TaskRunner接口描述的是一個從帶消息循環的線程的獲得的MessageLoopProxyImpl對象,那麼PostTaskAndReplyTaskRunner類的成員函數PostTask就是通過該MessageLoopProxyImpl對象向線程的消息隊列發送參數task描述的一個消息。

       由于參數task指向的是一個Closure對象,并且它綁定的是PostTaskAndReplyRelay類的成員函數Run,是以當上述消息被處理時,PostTaskAndReplyRelay類的成員函數Run就會被調用。

       PostTaskAndReplyRelay類的成員函數Run的實作如下所示:

class PostTaskAndReplyRelay {
public:
......
void Run() {
task_.Run();
origin_loop_->PostTask(
from_here_,
Bind(&PostTaskAndReplyRelay::RunReplyAndSelfDestruct,
base::Unretained(this)));
}
......
}
           

       這個函數定義在檔案external/chromium_org/base/threading/post_task_and_reply_impl.cc中。

       PostTaskAndReplyRelay類的成員函數Run調用了成員變量task_指向的一個Closure對象的成員函數Run。從前面的分析可以知道,該Closure對象就是最初調用TaskRunner類的成員函數PostTaskAndReply所要執行的Task。

       執行完成成員變量task_指向的一個Closure之後,接下來PostTaskAndReplyRelay類的成員函數Run接下來向最初調用了TaskRunner類的成員函數PostTaskAndReply的線程發送一個Closure,該Closure綁定的是目前正在處理的PostTaskAndReplyRelay對象的成員函數RunReplyAndSelfDestruct。

       PostTaskAndReplyRelay類的成員函數RunReplyAndSelfDestruct的實作如下所示:

class PostTaskAndReplyRelay {
......
private:
void RunReplyAndSelfDestruct() {
DCHECK(origin_loop_->BelongsToCurrentThread());
// Force |task_| to be released before |reply_| is to ensure that no one
// accidentally depends on |task_| keeping one of its arguments alive while
// |reply_| is executing.
task_.Reset();
reply_.Run();
// Cue mission impossible theme.
delete this;
}
......
}
           

       這個函數定義在檔案external/chromium_org/base/threading/post_task_and_reply_impl.cc中。

       PostTaskAndReplyRelay類的成員函數RunReplyAndSelfDestruct所做的事情就是在最初調用了TaskRunner類的成員函數PostTaskAndReply的線程中執行成員變量reply_描述的一個Closure對象。從前面的分析可以知道,該Closure對象就是最初用TaskRunner類的成員函數PostTaskAndReply時指定的第三個參數reply所描述的Closure對象。

       最後,PostTaskAndReplyRelay類的成員函數RunReplyAndSelfDestruct将目前正在處理的PostTaskAndReplyRelay對象銷毀掉。這樣,一個雙向的異步通信就執行完成了。

       至此,我們就分析完成Chromium的線程消息循環和消息發送機制了。Chromium的多線程模型正是基于這種線程消息循環和消息發送機制設計和實作的,其最大的特點是一切皆異步通信,進而提高各個線程,特别是UI線程的響應性,進而讓使用者覺得Chromium加載網頁的速度很快。

       最後,我們分别對Chromium在Android平台實作的線程消息循環和消息發送作一個簡要的總結。

       Chromium的線程消息循環根據不同的線程具有不同的實作,具體來說,就是:

       1. UI線程和Java線程的消息循環是通過Java層的消息循環實作的,也就是通過Android應用程式使用的消息循環實作的。

       2. IO線程的消息循環是基于Libevent實作的,也就是通過epoll實作的,這是因為IO線程主要是用來執行IPC,而這種IPC是通過UNIX Socket實作的,這意味IO線程的消息循環主要用來監控UNIX Socket檔案描述符的,是以就适合使用epoll來實作。

       3. 其它類型的線程的消息循環是基于條件變量實作的。

       Chromium的線程消息發送可以通過以下三種接口實作:

       1. SingleThreadTaskRunner、SequencedTaskRunner和TaskRunner,這三個接口是比MessageLoopProxy和MessageLoop更一般的接口,因為它們不關心負責處理消息的線程是如何實作的。

       2. MessageLoopProxy,這個接口比MessageLoop更好用,因為消息的發送者可以一直持有該接口,而不用關心該接口所關聯的線程是否已經退出。

       3. MessageLoop,這個接口要求使用者確定它所關聯的線程是否已經退出,如果已經退出,那麼是不可以使用的。

       了解Chromium的線程消息循環和消息發送機制對了解Chromium的多線程模型非常重要,而在Chromium的源碼裡大量地使用了這些消息循環和消息處理機制。例如,我們接下來的文章中分析的Chromium多程序通信機制就是通過上面提到的IO線程實作的。敬請關注!更多的資訊也可以關注老羅的新浪微網誌: http://weibo.com/shengyangluo。

繼續閱讀