天天看點

boost::thread類

前言 

标準C++線程即将到來。預言它将衍生自Boost線程庫,現在讓我們探索一下Boost線程庫。 

幾年前,用多線程執行程式還是一件非比尋常的事。然而今天網際網路應用服務程式普遍使用多線程來提高與多客戶連結時的效率;為了達到最大的吞吐量,事務伺服器在單獨的線程上運作服務程式;GUI應用程式将那些費時,複雜的處理以線程的形式單獨運作,以此來保證使用者界面能夠及時響應使用者的操作。這樣使用多線程的例子還有很多。 

但是C++标準并沒有涉及到多線程,這讓程式員們開始懷疑是否可能寫出多線程的C++程式。盡管不可能寫出符合标準的多線程程式,但是程式員們還是會使用支援多線程的作業系統提供的多線程庫來寫出多線程C++程式。但是這樣做至少有兩個問題:這些庫大部分都是用C語言完成的,如果在C++程式中要使用這些庫就必須十分小心;還有,每一個作業系統都有自己的一套支援多線程的類庫。是以,這樣寫出來得代碼是沒有标準可循的,也不是到處都适用的(non-portable)。Boost線程庫就是為了解決所有這些問題而設計的。 

Boost是由C++标準委員會類庫工作組成員發起,緻力于為C++開發新的類庫的組織。現在它已經有近2000名成員。許多庫都可以在Boost源碼的釋出版本中找到。為了使這些類庫是線程安全的(thread-safe),Boost線程庫被建立了。 

Boost線程庫的所有接口設計都是從0開始的,并不是C線程API的簡單封裝。許多C++特性(比如構造函數和析構函數,函數對象(function object)和模闆)都被使用在其中以使接口更加靈活。現在的版本可以在POSIX,Win32和Macintosh Carbon平台下工作。 

建立線程 

就像std::fstream類就代表一個檔案一樣,boost::thread類就代表一個可執行的線程。預設構造函數建立一個代表目前執行線程的執行個體。一個重載的構造函數以一個不需任何參數的函數對象作為參數,并且沒有傳回值。這個構造函數建立一個新的可執行線程,它調用了那個函數對象。 

起先,大家認為傳統C建立線程的方法似乎比這樣的設計更有用,因為C建立線程的時候會傳入一個void*指針,通過這種方法就可以傳入資料。然而,由于Boost線程庫是使用函數對象來代替函數指針,那麼函數對象本身就可以攜帶線程所需的資料。這種方法更具靈活性,也是類型安全(type-safe)的。當和Boost.Bind這樣的功能庫一起使用時,這樣的方法就可以讓我們傳遞任意數量的資料給建立的線程。 

目前,由Boost線程庫建立的線程對象功能還不是很強大。事實上它隻能做兩項操作。線程對象可以友善使用==和!=進行比較來确定它們是否是代表同一個線程;我們還可以調用boost::thread::join來等待線程執行完畢。其他一些線程庫可以讓我們對線程做一些其他操作(比如設定優先級,甚至是取消線程)。然而,由于要在普遍适用(portable)的接口中加入這些操作不是簡單的事,目前仍在讨論如何将這些操組加入到Boost線程庫中。 

Listing1展示了boost::thread類的一個最簡單的用法。 建立的線程隻是簡單的在std::out上列印“hello,world”,main函數在它執行完畢之後結束。 

例1: 

#include <boost/thread/thread.hpp>

#include <iostream>

void hello()

{

 std::cout <<"Hello world, I'm a thread!"<< std::endl;

}

int main(int argc, char* argv[])

 boost::thread thrd(&hello);

 thrd.join();

 return 0;

互斥體 

任何寫過多線程程式的人都知道避免不同線程同時通路共享區域的重要性。如果一個線程要改變共享區域中某個資料,而與此同時另一線程正在讀這個資料,那麼結果将是未定義的。為了避免這種情況的發生就要使用一些特殊的原始類型和操作。其中最基本的就是互斥體(mutex,mutual exclusion的縮寫)。一個互斥體一次隻允許一個線程通路共享區。當一個線程想要通路共享區時,首先要做的就是鎖住(lock)互斥體。如果其他的線程已經鎖住了互斥體,那麼就必須先等那個線程将互斥體解鎖,這樣就保證了同一時刻隻有一個線程能通路共享區域。 

互斥體的概念有不少變種。Boost線程庫支援兩大類互斥體,包括簡單互斥體(simple mutex)和遞歸互斥體(recursive mutex)。如果同一個線程對互斥體上了兩次鎖,就會發生死鎖(deadlock),也就是說所有的等待解鎖的線程将一直等下去。有了遞歸互斥體,單個線程就可以對互斥體多次上鎖,當然也必須解鎖同樣次數來保證其他線程可以對這個互斥體上鎖。 

在這兩大類互斥體中,對于線程如何上鎖還有多個變種。一個線程可以有三種方法來對一個互斥體加鎖: 

一直等到沒有其他線程對互斥體加鎖。 

如果有其他互斥體已經對互斥體加鎖就立即傳回。 

一直等到沒有其他線程互斥體加鎖,直到逾時。 

似乎最佳的互斥體類型是遞歸互斥體,它可以使用所有三種上鎖形式。然而每一個變種都是有代價的。是以Boost線程庫允許我們根據不同的需要使用最有效率的互斥體類型。Boost線程庫提供了6中互斥體類型,下面是按照效率進行排序: 

boost::mutex,

boost::try_mutex, 

boost::timed_mutex, 

boost::recursive_mutex, 

boost::recursive_try_mutex, 

boost::recursive_timed_mutex 

如果互斥體上鎖之後沒有解鎖就會發生死鎖。這是一個很普遍的錯誤,Boost線程庫就是要将其變成不可能(至少時很困難)。直接對互斥體上鎖和解鎖對于Boost線程庫的使用者來說是不可能的。mutex類通過teypdef定義在RAII中實作的類型來實作互斥體的上鎖和解鎖。這也就是大家知道的Scope Lock模式。為了構造這些類型,要傳入一個互斥體的引用。構造函數對互斥體加鎖,析構函數對互斥體解鎖。C++保證了析構函數一定會被調用,是以即使是有異常抛出,互斥體也總是會被正确的解鎖。 

這種方法保證正确的使用互斥體。然而,有一點必須注意:盡管Scope Lock模式可以保證互斥體被解鎖,但是它并沒有保證在異常抛出之後貢獻資源仍是可用的。是以就像執行單線程程式一樣,必須保證異常不會導緻程式狀态異常。另外,這個已經上鎖的對象不能傳遞給另一個線程,因為它們維護的狀态并沒有禁止這樣做。 

List2給出了一個使用boost::mutex的最簡單的例子。例子中共建立了兩個新的線程,每個線程都有10次循環,在std::cout上列印出線程id和目前循環的次數,而main函數等待這兩個線程執行完才結束。std::cout就是共享資源,是以每一個線程都使用一個全局互斥體來保證同時隻有一個線程能向它寫入。 

許多讀者可能已經注意到List2中傳遞資料給線程還必須的手工寫一個函數。盡管這個例子很簡單,如果每一次都要寫這樣的代碼實在是讓人厭煩的事。有一種簡單的解決辦法。函數庫允許通過将另一個函數綁定,并傳入調用時需要的資料來建立一個新的函數。 List3展示了如何使用Boost.Bind庫來簡化List2中的代碼,這樣就不必手工寫這些函數對象了。 

例2: 

#include <boost/thread/mutex.hpp>

boost::mutex io_mutex;

struct count

 count(int id) : id(id) { }

 void operator()()

 {

    for (int i = 0; i < 10; ++i)

    {

      boost::mutex::scoped_lock

      lock(io_mutex);

     std::cout << id << ": "<< i << std::endl;

      }

    }

 int id;

};

 boost::thread thrd1(count(1));

 boost::thread thrd2(count(2));

 thrd1.join();

 thrd2.join();

例3: // 這個例子和例2一樣,除了使用Boost.Bind來簡化建立線程攜帶資料,避免使用函數對象 

#include <boost/bind.hpp>

void count(int id)

 for (int i = 0; i < 10; ++i)

   boost::mutex::scoped_lock

    lock(io_mutex);

    std::cout << id << ": " <<i << std::endl;

   }

 boost::thread thrd1(boost::bind(&count, 1));

 boost::thread thrd2(boost::bind(&count, 2));

條件變量 

有的時候僅僅依靠鎖住共享資源來使用它是不夠的。有時候共享資源隻有某些狀态的時候才能夠使用。比如,某個線程如果要從堆棧中讀取資料,那麼若棧中沒有資料就必須等待資料被壓棧。這種情況下的同步使用互斥體是不夠的。另一種同步的方式--條件變量,就可以使用在這種情況下。 

條件變量的使用總是和互斥體及共享資源聯系在一起的。線程首先鎖住互斥體,然後檢驗共享資源的狀态是否處于可使用的狀态。如果不是,那麼線程就要等待條件變量。要指向這樣的操作就必須在等待時将互斥體解鎖,以便其他線程可以通路共享資源并改變其狀态。它還得保證從等到的線程傳回時互斥體是被上鎖得。當另一個線程改變了共享資源的狀态時,它就要通知正在等待條件變量得線程,并将之傳回等待的線程。 

List4是一個使用了boost::condition的簡單例子。有一個實作了有界緩存區的類和一個固定大小的先進先出的容器。由于使用了互斥體boost::mutex,這個緩存區是線程安全的。put和get使用條件變量來保證線程等待完成操作所必須的狀态。有兩個線程被建立,一個在buffer中放入100個整數,另一個将它們從buffer中取出。這個有界的緩存一次隻能存放10個整數,是以這兩個線程必須周期性的等待另一個線程。為了驗證這一點,put和get在std::cout中輸出診斷語句。最後,當兩個線程結束後,main函數也就執行完畢了。 

#include <boost/thread/condition.hpp>

const int BUF_SIZE = 10;

const int ITERS = 100;

class buffer

 public:

   typedef boost::mutex::scoped_lock scoped_lock;

   buffer(): p(0), c(0), full(0)

   {     }

   void put(int m)

scoped_lock lock(mutex);

      if (full == BUF_SIZE)

      {

       {

         boost::mutex::scoped_lock

         lock(io_mutex);

         std::cout <<"Buffer is full. Waiting.."<< std::endl;

        }

        while (full == BUF_SIZE)

            cond.wait(lock);

       }

       buf[p] = m;

       p = (p+1) % BUF_SIZE;

       ++full;

       cond.notify_one();

    int get()

scoped_lock lk(mutex);

      if (full == 0)

        {

          boost::mutex::scoped_lock

          lock(io_mutex);

          std::cout <<"Buffer is empty. Waiting..."<< std::endl;

         }

         while (full == 0)

            cond.wait(lk);

        int i = buf[c];

        c = (c+1) % BUF_SIZE;

        --full;

        cond.notify_one();

        return i;

 private:

   boost::mutex mutex;

  boost::condition cond;

   unsigned int p, c, full;

   int buf[BUF_SIZE];

buffer buf;

void writer()

 for (int n = 0; n < ITERS; ++n)

     {

       boost::mutex::scoped_lock

        lock(io_mutex);

        std::cout << "sending: "<< n << std::endl;

      buf.put(n);

void reader()

 for (int x = 0; x < ITERS; ++x)

      int n = buf.get();

          std::cout << "received: "<< n << std::endl;

 boost::thread thrd1(&reader);

 boost::thread thrd2(&writer);

線程局部存儲 

大多數函數都不是可重入的。這也就是說在某一個線程已經調用了一個函數時,如果再調用同一個函數,那麼這樣是不安全的。一個不可重入的函數通過連續的調用來儲存靜态變量或者是傳回一個指向靜态資料的指針。舉例來說,std::strtok就是不可重入的,因為它使用靜态變量來儲存要被分割成符号的字元串。 

有兩種方法可以讓不可重用的函數變成可重用的函數。第一種方法就是改變接口,用指針或引用代替原先使用靜态資料的地方。比方說,POSIX定義了strok_r,std::strtok中的一個可重入的變量,它用一個額外的char**參數來代替靜态資料。這種方法很簡單,而且提供了可能的最佳效果。但是這樣必須改變公共接口,也就意味着必須改代碼。另一種方法不用改變公有接口,而是用本地存儲線程(thread local storage)來代替靜态資料(有時也被成為特殊線程存儲,thread-specific storage)。 

Boost線程庫提供了智能指針boost::thread_specific_ptr來通路本地存儲線程。每一個線程第一次使用這個智能指針的執行個體時,它的初值是NULL,是以必須要先檢查這個它的隻是否為空,并且為它指派。Boost線程庫保證本地存儲線程中儲存的資料會線上程結束後被清除。 

List5是一個使用boost::thread_specific_ptr的簡單例子。其中建立了兩個線程來初始化本地存儲線程,并有10次循環,每一次都會增加智能指針指向的值,并将其輸出到std::cout上(由于std::cout是一個共享資源,是以通過互斥體進行同步)。main線程等待這兩個線程結束後就退出。從這個例子輸出可以明白的看出每個線程都處理屬于自己的資料執行個體,盡管它們都是使用同一個boost::thread_specific_ptr。 

例5: 

#include <boost/thread/tss.hpp>

boost::thread_specific_ptr<int> ptr;

   count(int id) : id(id) { }

   void operator()()

   {

      if (ptr.get() == 0)

        ptr.reset(new int(0));

      for (int i = 0; i < 10; ++i)

         (*ptr)++;

          std::cout << id << ": "<< *ptr << std::endl;

   int id;

        boost::thread thrd1(count(1));

        boost::thread thrd2(count(2));

        thrd1.join();

        thrd2.join();

        return 0;

僅運作一次的例程 

還有一個問題沒有解決:如何使得初始化工作(比如說構造函數)也是線程安全的。比如,如果一個引用程式要産生唯一的全局的對象,由于執行個體化順序的問題,某個函數會被調用來傳回一個靜态的對象,它必須保證第一次被調用時就産生這個靜态的對象。這裡的問題就是如果多個線程同時調用了這個函數,那麼這個靜态對象的構造函數就會被調用多次,這樣錯誤産生了。 

解決這個問題的方法就是所謂的“一次實作”(once routine)。“一次實作”在一個應用程式隻能執行一次。如果多個線程想同時執行這個操作,那麼真正執行的隻有一個,而其他線程必須等這個操作結束。為了保證它隻被執行一次,這個routine由另一個函數間接的調用,而這個函數傳給它一個指針以及一個标志着這個routine是否已經被調用的特殊标志。這個标志是以靜态的方式初始化的,這也就保證了它在編譯期間就被初始化而不是運作時。是以也就沒有多個線程同時将它初始化的問題了。Boost線程庫提供了boost::call_once來支援“一次實作”,并且定義了一個标志boost::once_flag及一個初始化這個标志的宏BOOST_ONCE_INIT。

List6是一個使用了boost::call_once的例子。其中定義了一個靜态的全局整數,初始值為0;還有一個由BOOST_ONCE_INIT初始化的靜态boost::once_flag執行個體。main函數建立了兩個線程,它們都想通過傳入一個函數調用boost::call_once來初始化這個全局的整數,這個函數是将它加1。main函數等待着兩個線程結束,并将最後的結果輸出的到std::cout。由最後的結果可以看出這個操作确實隻被執行了一次,因為它的值是1。 

#include <boost/thread/once.hpp>

int i = 0;

boost::once_flag flag = BOOST_ONCE_INIT;

void init()

  ++i;

void thread()

 boost::call_once(&init, flag);

   boost::thread thrd1(&thread);

   boost::thread thrd2(&thread);

   thrd1.join();

   thrd2.join();

   std::cout << i << std::endl;

   return 0;

Boost線程庫的未來 

Boost線程庫正在計劃加入一些新特性。其中包括boost::read_write_mutex,它可以讓多個線程同時從共享區中讀取資料,但是一次隻可能有一個線程向共享區寫入資料;boost::thread_barrier,它使得一組線程處于等待狀态,知道所有得線程都都進入了屏障區;boost::thread_pool,允許執行一些小的routine而不必每一都要建立或是銷毀一個線程。

繼續閱讀