天天看點

多線程 條件變量

作者:王東

1.1       什麼是條件變量和條件等待?

簡單的說:

條件變量(condition variable)是利用線程間共享的全局變量進行同步的一種機制,主要包括兩個動作:一個線程等待某個條件為真,而将自己挂起;另一個線程使的條件成立,并通知等待的線程繼續。為了防止競争,條件變量的使用總是和一個互斥鎖結合在一起。

Wiki中的定義如下:

Conceptually a condition variable is a queue of threads, associated with a monitor, on which a thread may wait for some condition to become true.Thus each condition variable c is associated with an assertion P. While a thread is waiting on a condition variable, that thread is not considered to occupy the monitor, and so other threads may enter the monitor to change the monitor's state. In most types of monitors, these other threads may signal the condition variable c to indicate that assertion P is true in the current state[1].

條件變量(condition variable)是一種特殊的同步變量,它是與一個互斥量(monitor)關聯的線程隊列,條件變量都與一個斷言(assertion) P關聯,因為其中的線程隊列中有一個線程在等待這個斷言P為真。當一個線程處于等待條件變量(condition variable)時,該線程不再占用互斥量(monitor),讓其他線程能夠進入互斥區去改變條件狀态。

在條件變量上有兩種基本操作:

l  等待(wait):一個線程因為等待斷言(assertion) P為真而處于等待在條件變量上,此時線程不會占用互斥量(monitor);

l  通知(signal/notify):另一個線程在使得斷言(assertion) P為真的時候,通知條件變量。

一個線程發生signal時,另一個線程被激活,那麼兩個線程都占用的互斥量(monitor), 選擇哪個線程來占用互斥,這就分為了Blocking condition variables(把優先級給被通知的線程)和Nonblocking condition variables(把優先級給發出signal通知的線程[1]。

使用條件等待有如下的場景:

多線程通路一個互斥區域内的資源,如果擷取資源的條件不夠時,則線程需要等待,直到其他線程釋放資源後,喚醒等待條件,使得線程得以繼續。例如:

Thread1:

Lock (mutex)

while (condition is false) {

//為什麼要在這裡用while而不是if呢?

//參考1.2.1條件變量存在的問題

Cond_wait(cond, mutex, timeout)

}

DoSomething()

Unlock (mutex)

Thread2:

Lock (mutex)

condition is true

Cond_signal(cond)

Unlock (mutex)

例如 Thread1從一個大小為50的連結池中擷取一個連結,如果已經用的連結達到50時,那該線程必須等待一個條件。 Thread2 用完一個連結時,将該連結還給連結池,然後發送條件notify,告訴Thread1 可以繼續了。.

1.1.1        關于條件變量(condition variable)和信号量(Semaphore)

信号量(Semaphore)是一個非負的整數計數器,被用于程序或線程間的同步與互斥。

通過信号量可以實作 “PV操作”這種程序或線程間的同步機制。

P操作是獲得資源,将信号量的值減1,如果結果不為負則繼續執行,線程獲得資源,否則線程被阻塞,處于睡眠狀态,直到等待的資源被别的線程釋放;

V操作則是釋放資源,給信号量的值加1,釋放一個因執行P操作而等待的線程。

最簡單的信号燈形式,信号燈的值隻能取0或1,類似于mutex。

當信号量的值為任意非負值(大于1),其值就代表可用資源的個數。

可以将信号量Semaphore和互斥鎖(mutex)來實作一個來實作對一個池的同步和保護。使用mutex來實作同步,使用semaphore用于實作對資源記數。

獲得資源的線程:

sem_wait (semaphore1)

Lock (mutex)

Unlock (mutex)

sem_post (semaphore2)

釋放資源的線程:

sem_wait (semaphore2)

Lock (mutex)

Unlock (mutex)

sem_post (semaphore1)

這個模型很像多線程的生産者與消費者模型,這裡的semaphore2是為了防止過度釋放。

比起信号量來說,條件變量可以實作更為複雜的等待條件。當然,條件變量和互斥鎖也可以實作信号量的功能(window下的條件變量隻能實作線程同步不能實作程序同步)。

在Posix.1基本原理一文聲稱,有了互斥鎖和條件變量還提供信号量的原因是:“本标準提供信号量的而主要目的是提供一種程序間同步的方式;這些程序可能共享也可能不共享記憶體區。互斥鎖和條件變量是作為線程間的同步機制說明的;這些線程總是共享(某個)記憶體區。這兩者都是已廣泛使用了多年的同步方式。每組原語都特别适合于特定的問題”。盡管信号量的意圖在于程序間同步,互斥鎖和條件變量的意圖在于線程間同步,但是信号量也可用于線程間,互斥鎖和條件變量也可用于程序間。應當根據實際的情況進行決定。信号量最有用的場景是用以指明可用資源的數量[11]。

個人的感覺是:由于起源不同,導緻了兩種理念,一中理念力挺條件變量(condition variable),覺得信号量沒有什麼用(例如POSIX Thread模型中沒有信号量的概念,雖然也提出了Posix Semaphore,但是為什麼一開始不把它放在一起呢?);另一理念恰好相反(例如window剛開始沒有條件變量的概念,隻有信号量的概念)。

進化到後來,目前的linux和window都同時具備了這二者。

1.2       Linux中的條件等待函數是那些?

Linux提供了的條件等待函數和notify函數。

l  pthread_cond_timedwait(cond, mutex, abstime);

l  pthread_cond_wait(cond, mutex);

l  pthread_cond_signal(cond);    将至少解鎖一個線程(阻塞在條件變量上的線程)。

l  pthread_cond_broadcast(cond) : 将對所有阻塞在條件變量上的線程解鎖。

線程1調用pthread_cond_wait() 所做的事 三個部分:

1.         同時對mutex解鎖,

2.         并等待條件 cond 發生

3.         獲得通知後,對mutex加鎖;

調用pthread_cond_wait()後,同時對mutex解鎖,并等待條件 cond 發生(要求解鎖并阻塞是一個原子操作)

現在互斥對象已被解鎖,其它線程可以進入互斥區域,修改條件。

此時,pthread_cond_wait() 調用還未傳回。等待條件 mycond是一個阻塞操作,這意味着線程将睡眠,在它蘇醒之前不會消耗 CPU 周期。直到特定條件發生[3]。

假設另一個線程2對mutex加鎖, 并改變條件, 然後調用函數 pthread_cond_signal() 激活等待條件。這意味着線程1現在将蘇醒。此時線程1試圖對mutex加鎖,由于線程2還沒有對mutex解鎖,是以線程1隻有等待,隻有線上程2對mutex解鎖後,線程1優先獲得mutex加鎖,然後就能做想做的事情了。

這裡是存在問題的:如何讓線程1優先獲得mutex加鎖,而不是其他線程,pthread_mutex_lock 的僞代碼[4]中展示了這種實作的可能性,signal函數中優先激活了wait中的線程。

pthread_cond_wait(mutex, cond):

    value = cond->value;

    pthread_mutex_unlock(mutex);

    pthread_mutex_lock(cond->mutex);

    if (value == cond->value) {

        me->next_cond = cond->waiter;

        cond->waiter = me;

        pthread_mutex_unlock(cond->mutex);

        unable_to_run(me);

    } else

        pthread_mutex_unlock(cond->mutex);

    pthread_mutex_lock(mutex);

pthread_cond_signal(cond):

    pthread_mutex_lock(cond->mutex);

    cond->value++;

    if (cond->waiter) {

        sleeper = cond->waiter;

        cond->waiter = sleeper->next_cond;

        able_to_run(sleeper);

    }

    pthread_mutex_unlock(cond->mutex);

下面的例子展示了使用條件變量的示例代碼[2]:

其中一個或多個線程負責count數增加(inc_count),另一個線程負責監聽count數,一旦達到COUNT_LIMIT,就報告(watch_count)。

void inc_count (void) {

         …

    pthread_mutex_lock(&count_mutex);

    count++;

    if (count == COUNT_LIMIT) {

      pthread_cond_signal(&count_threshold_cv);

      printf("inc_count(): thread %ld, count = %d  Threshold reached./n",

             my_id, count);

      }

    printf("inc_count(): thread %ld, count = %d, unlocking mutex/n",

            my_id, count);

    pthread_mutex_unlock(&count_mutex);

         …

}

void watch_count (void) {

  …

  pthread_mutex_lock(&count_mutex);

  while (count<COUNT_LIMIT) {

    pthread_cond_wait(&count_threshold_cv, &count_mutex);

    printf("watch_count(): thread %ld Condition signal received./n", my_id);

    count += 125;

    printf("watch_count(): thread %ld count now = %d./n", my_id, count);

    }

  pthread_mutex_unlock(&count_mutex);

  …

}

1.2.1        條件變量中存在的問題:虛假喚醒

Linux中幫助中提到的:

在多核處理器下,pthread_cond_signal可能會激活多于一個線程(阻塞在條件變量上的線程)。 On a multi-processor, it may be impossible for an implementation of pthread_cond_signal() to avoid the unblocking of more than one thread blocked on a condition variable.

結果是,當一個線程調用pthread_cond_signal()後,多個調用pthread_cond_wait()或pthread_cond_timedwait()的線程傳回。這種效應成為”虛假喚醒”(spurious wakeup) [4]

The effect is that more than one thread can return from its call to pthread_cond_wait() or pthread_cond_timedwait() as a result of one call to pthread_cond_signal(). This effect is called "spurious wakeup". Note that the situation is self-correcting in that the number of threads that are so awakened is finite; for example, the next thread to call pthread_cond_wait() after the sequence of events above blocks.

雖然虛假喚醒在pthread_cond_wait函數中可以解決,為了發生機率很低的情況而降低邊緣條件(fringe condition)效率是不值得的,糾正這個問題會降低對所有基于它的所有更進階的同步操作的并發度。是以pthread_cond_wait的實作上沒有去解決它。

While this problem could be resolved, the loss of efficiency for a fringe condition that occurs only rarely is unacceptable, especially given that one has to check the predicate associated with a condition variable anyway. Correcting this problem would unnecessarily reduce the degree of concurrency in this basic building block for all higher-level synchronization operations.

是以通常的标準解決辦法是這樣的:

将條件的判斷從if 改為while

多線程 條件變量

pthread_cond_wait中的while()不僅僅在等待條件變量前檢查條件變量,實際上在等待條件變量後也檢查條件變量。

這樣對condition進行多做一次判斷,即可避免“虛假喚醒”.

這就是為什麼在pthread_cond_wait()前要加一個while循環來判斷條件是否為假的原因。

有意思的是這個問題也存在幾乎所有地方,包括: linux 條件等待的描述, POSIX Threads的描述, window API(condition variable), java等等。

l  在linux的幫助中對條件變量的描述是[4]:

添加while檢查的做法被認為是增加了程式的健壯性,在IEEE Std 1003.1-2001中認為spurious wakeup是允許的。

An added benefit of allowing spurious wakeups is that applications are forced to code a predicate-testing-loop around the condition wait. This also makes the application tolerate superfluous condition broadcasts or signals on the same condition variable that may be coded in some other part of the application. The resulting applications are thus more robust. Therefore, IEEE Std 1003.1-2001 explicitly documents that spurious wakeups may occur.

l  在POSIX Threads中[5]:

David R. Butenhof 認為多核系統中 條件競争(race condition [8])導緻了虛假喚醒的發生,并且認為完全消除虛假喚醒本質上會降低了條件變量的操作性能。

 “…, but on some multiprocessor systems, making condition wakeup completely predictable might substantially slow all condition variable operations. The race conditions that cause spurious wakeups should be considered rare”

l  在window的條件變量中[6]:

MSDN幫助中描述為,spurious wakeups問題依然存在,條件需要重複check。

Condition variables are subject to spurious wakeups (those not associated with an explicit wake) and stolen wakeups (another thread manages to run before the woken thread). Therefore, you should recheck a predicate (typically in a while loop) after a sleep operation returns.

l  在Java中 [7],對等待的寫法如下:

synchronized (obj) { 

    while (<condition does not hold>) 

        obj.wait(); 

     ... // Perform action appropriate to condition 

 }

Effective java 曾經提到Item 50: Never invoke wait outside a loop.

顯然,虛假喚醒是個問題,但它也是在JLS的第三版的JDK5的修訂中才得以澄清。在JDK 5的Javadoc進行更新

A thread can also wake up without being notified, interrupted, or timing out, a so-called spurious wakeup. While this will rarely occur in practice, applications must guard against it by testing for the condition that should have caused the thread to be awakened, and continuing to wait if the condition is not satisfied. In other words, waits should always occur in loops.

Apparently, the spurious wakeup is an issue (I doubt that it is a well known issue) that intermediate to expert developers know it can happen but it just has been clarified in JLS third edition which has been revised as part of JDK 5 development. The javadoc of wait method in JDK 5 has also been updated

1.3     Window 裡面的條件等待函數是那些?

比較奇怪的是一直到vista和window2008以前,window居然沒有标準的條件變量的概念。

實作條件變量和條件等待.

Window 采用了一種組合方式的政策來實作條件等待。

1.         使用 mutex 來實作互斥鎖

2.         使用 SignalObjectAndWait實作條件等待;

3.         對autoreset類型的event 使用 PulseEvent實作signal 條件;

事實上,window是用 autoreset的event來實作條件的。

具體的:

Thread1:

WaitForSingleObject(hMutex);

If (condition is false){

         SignalObjectAndWait(hMutex, hEvent);

WaitForSingleObject(hMutex);

}

Dosomething();

ReleaseMutex(hMutex);

Thead2:

WaitForSingleObject(hMutex);

PulseEvent (hEvent);

ReleaseMutex(hMutex);

SignalObjectAndWait做的工作和cond_wait不完全一樣。

SignalObjectAndWait隻做2件事情:

1.         同時對mutex解鎖,

2.         并等待條件 cond 發生

獲得signal通知,直接就往下走了,而不是等待mutex,這一點與Pthread_cond_wait()不同。是以在使用SignalObjectAndWait 後,必須使用lock(mutex)來獲得mutex的鎖,防止兩個線程同時進入mutex。

這裡lock(mutex)是可能存在問題的,因為無法保證這裡的lock(mutex)能優先獲得進入的權利。事實上如果在SignalObjectAndWait()和lock(mutex)(WaitForSingleObject)加sleep(), 就會導緻其他線程先獲得mutex的lock。是以也應該像linux中一樣,使用循環判斷。

Thread1:

WaitForSingleObject(hMutex);

while (condition is false){

         SignalObjectAndWait(hMutex, hEvent);

WaitForSingleObject(hMutex);

}

Dosomething();

ReleaseMutex(hMutex);

1.3.1        PulseEvent在條件等待中存在的問題,如何解決?

實際上PulseEvent 是不可信賴的,因為當一個線程處于等待狀态時,會一些瞬間其狀态并不是等待狀态,這就導緻了PulseEvent()不能激活這些線程。

例如:核心模式的APC調用,會導緻等待的線程瞬間不處于等待狀态。

A thread waiting on a synchronization object can be momentarily removed from the wait state by a kernel-mode APC, and then returned to the wait state after the APC is complete. If the call to PulseEvent occurs during the time when the thread has been removed from the wait state, the thread will not be released because PulseEvent releases only those threads that are waiting at the moment it is called. Therefore, PulseEvent is unreliable and should not be used by new applications. Instead, use condition variables.

當然可以用SetEvent ()來代替PulseEvent(),那将會喚醒是以等待的線程。這樣的喚醒更像是notifyAll,而不是notify.

标準的解決辦法是使用window中condition variables[9].

1.3.2        Window下标準做法

condition variables是微軟從vista和2008以後引入的技術,xp和2003的系統不支援。

condition variables和臨界區一樣,是使用者态調用,不是系統調用(意味着是高效的),隻支援同一個程序内的多線程。

l  WakeConditionVariable           喚醒一個等待條件變量的線程

l  WakeAllConditionVariable      喚醒所有等待條件變量的線程;

l  SleepConditionVariableCS       釋放臨界區鎖和等待條件變量作為原子性操作

l  SleepConditionVariableSRW   釋放SRW鎖和等待條件變量作為原子性操作.

這樣,在window下使用條件變量的方法如下所示:

Thread1:

   EnterCriticalSection(&CritSection);

   while( TestPredicate() == FALSE ){

      SleepConditionVariableCS(&ConditionVar, &CritSection, INFINITE);

   }

   DoSth();

   LeaveCriticalSection(&CritSection);

Thread2:

   EnterCriticalSection(&CritSection);

      WakeConditionVariable (ConditionVar);

   LeaveCriticalSection(&CritSection);

同樣的spurious wakeups問題依然存在,條件需要重複check。

1.3.3        Window下的其他方法?

Window下面使用condition variable固然好,但是對于不能使用condition variable的xp和2003怎麼辦呢。

有兩個辦法:

1 使用pthread porting到window中的pthread win32方法。使用pthread在window中的運作庫。具體可參考:

http://sourceware.org/pthreads-win32/

2 自己寫一個,或者利用他人寫好的條件變量

可參考:

a)       A Fair Monitor (Condition Variables) Implementation for Win32

http://thbecker.net/free_software_utilities/fair_monitor_for_win32/start_page.html

b)       Windows下條件變量的實作

http://blog.csdn.net/leafarmy/archive/2009/03/31/4039548.aspx

1.4       參考:

1.Monitor (synchronization) Condition variable

http://en.wikipedia.org/wiki/Condition_variable

2 POSIX Threads Programming

https://computing.llnl.gov/tutorials/pthreads/#ConditionVariables

3 pthread_cond_wait()太難了解了

http://hi.baidu.com/nkhzj/blog/item/f5480d4f740f7f35aec3ab4b.html

4 pthread_cond_signal(3) - Linux man page

http://linux.die.net/man/3/pthread_cond_signal

5 POSIX Threads-Spurious wakeup

http://en.wikipedia.org/wiki/Spurious_wakeup

6 Condition Variables

http://msdn.microsoft.com/en-us/library/ms682052(v=VS.85).aspx

7 java-的spurious wakeup問題

http://www.devguli.com/blog/eng/spurious-wakeup/

8 Race condition

http://en.wikipedia.org/wiki/Race_condition

9 PulseEvent

http://msdn.microsoft.com/en-us/library/ms684914(VS.85).aspx

10 windows - Condition Variables

http://msdn.microsoft.com/en-us/library/ms682052(v=VS.85).aspx

11 程序間的通信(互斥鎖、條件變量、讀寫鎖、檔案鎖、信号燈)

http://blog.csdn.net/ccskyer/archive/2010/12/24/6096710.aspx

12 pthread_cond_wait的spurious wakeup問題

http://blog.chinaunix.net/u/12592/showart_2213910.html

繼續閱讀