天天看點

IBM文章使用 Boost 的 IPC 和 MPI 庫進行并發程式設計 Boost C++ 庫讓并發程式設計變得既簡單又有趣。學習如何使用兩個 Boost 庫 —— Interprocess (IPC) 庫和 Message Passing Interface (MPI) 實作共享記憶體對象、同步檔案鎖和分布式通信等功能。

Boost 

C++

 庫讓并發程式設計變得既簡單又有趣。學習如何使用兩個 Boost 庫 —— Interprocess (IPC) 庫和 Message Passing Interface (MPI) 實作共享記憶體對象、同步檔案鎖和分布式通信等功能。

http://www.ibm.com/developerworks/cn/views/aix/libraryview.jsp

Arpan Sen, 獨立作家

2011 年 6 月 07 日

  • IBM文章使用 Boost 的 IPC 和 MPI 庫進行并發程式設計 Boost C++ 庫讓并發程式設計變得既簡單又有趣。學習如何使用兩個 Boost 庫 —— Interprocess (IPC) 庫和 Message Passing Interface (MPI) 實作共享記憶體對象、同步檔案鎖和分布式通信等功能。
    内容
IBM文章使用 Boost 的 IPC 和 MPI 庫進行并發程式設計 Boost C++ 庫讓并發程式設計變得既簡單又有趣。學習如何使用兩個 Boost 庫 —— Interprocess (IPC) 庫和 Message Passing Interface (MPI) 實作共享記憶體對象、同步檔案鎖和分布式通信等功能。

在 IBM Bluemix 雲平台上開發并部署您的下一個應用。

現在就開始免費試用

使用非常流行的 Boost 庫進行并發程式設計非常有意思。Boost 有幾個用于并發程式設計領域的庫:Interprocess (IPC) 庫用于實作共享記憶體、記憶體映射的 I/O 和消息隊列;Thread 庫用于實作可移植的多線程;Message Passing Interface (MPI) 庫用于分布式計算中的消息傳遞;Asio 庫用于使用套接字和其他低層功能實作可移植的連網功能。本文介紹 IPC 和 MPI 庫以及它們提供的一些功能。

本文中将學習如何使用 Boost IPC 庫實作共享記憶體對象、消息隊列和同步檔案鎖。通過使用 Boost MPI 庫,了解 

environment

 和 

communicator

 類,以及如何實作分布式通信。

注意:本文中的代碼已經用 

gcc-4.3.4

 和 

boost-1.45

 包測試過了。

常用縮寫詞

  • API:應用程式程式設計接口
  • I/O:輸入/輸出
  • POSIX:針對 UNIX 的便攜式作業系統接口®
  • SDK:軟體開發工具包

使用 Boost IPC 庫

Boost Interprocess 是一個隻由頭檔案組成的庫,是以您需要做的隻是在自己的源代碼中包含适當的頭檔案并讓編譯器知道 

include

 路徑。這是一個非常好的特性;您隻需下載下傳 Boost 源代碼(見 參考資料 中的連結),然後就可以開始使用了。例如,要想在自己的代碼中使用共享記憶體,就使用 清單 1 所示的 

include

清單 1. Boost IPC 庫隻由頭檔案組成
#include <boost/interprocess/shared_memory_object.hpp>
using namespace boost::interprocess; 
//… your sources follow …      

在把資訊傳遞給編譯器時,您要求程序根據安裝相應地修改 

include

 路徑。然後,編譯代碼:

bash-4.1$  g++ ipc1.cpp –I../boost_1_45_0      

建立共享記憶體對象

我們先從傳統的 "Hello World!" 程式開始。有兩個程序:第一個程序把字元串 "Hello World!" 寫入記憶體,另一個程序讀取并顯示此字元串。像 清單 2 這樣建立共享記憶體對象。

清單 2. 建立共享記憶體對象
#include <boost/interprocess/shared_memory_object.hpp>

int main(int argc, char* argv[ ]) 
{
    using namespace using boost::interprocess; 
    try { 
    // creating our first shared memory object.
    shared_memory_object sharedmem1 (create_only, "Hello", read_write);

    // setting the size of the shared memory
    sharedmem1.truncate (256);

    // … more code follows
    } catch (interprocess_exception& e) { 
    // .. .  clean up 
    } 
}      

sharedmem1

 對象的類型是 

shared_memory_object

(在 Boost 頭檔案中聲明并定義),它的構造函數有三個參數:

  • 第一個參數 —

     create_only 

    — 表示要建立這個共享記憶體對象而且還沒有建立它。如果已經存在同名的共享對象,就會抛出異常。對于希望通路已經建立的共享記憶體的程序,第一個參數應該是 

    open_only

  • 第二個參數 —

     Hello 

    — 是共享記憶體區域的名稱。另一個程序将使用這個名稱通路這個共享記憶體。
  • 第三個參數 —

     read_write 

    — 是共享記憶體對象的通路訓示符。因為這個程序要修改共享記憶體對象的内容,是以使用 

    read_write

    。隻從共享記憶體讀取資料的程序使用 

    read_only

     訓示符。

truncate

 方法以位元組為機關設定共享記憶體的大小。最好把代碼放在 

try-catch

 塊中。例如,如果無法建立共享記憶體對象,就抛出類型為

boost::interprocess_exception

 的異常。

使用共享記憶體對象寫資料

使用共享記憶體對象的程序必須在自己的位址空間中映射對象。使用在頭檔案 mapped_region.hpp 中聲明并定義的 

mapped_region

 類執行映射。使用 

mapped_region

 的另一個好處是可以對共享記憶體對象進行完全和部分通路。清單 3 示範如何使用 

mapped_region

清單 3. 使用 mapped_region 通路共享記憶體對象
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>

int main(int argc, char* argv[ ]) 
{
    using namespace boost::interprocess; 
    try { 
    // creating our first shared memory object.
    shared_memory_object sharedmem1 (create_only, "Hello", read_write);

    // setting the size of the shared memory
    sharedmem1.truncate (256);

   // map the shared memory to current process 
   mapped_region mmap (sharedmem1, 256); 

    // access the mapped region using get_address
    std::strcpy(static_cast<char* >(region.get_address()), "Hello World!\n");
    
    } catch (interprocess_exception& e) { 
    // .. .  clean up 
    } 
}      

就這麼簡單。現在已經建立了您自己的 

mapped_region

 對象并使用 

get_address

 方法通路了它。執行了 

static_cast

,因為 

get_address

 傳回一個 

void*

當主程序退出時共享記憶體會怎麼樣?

當主程序退出時,并不删除共享記憶體。要想删除共享記憶體,需要調用 

shared_memory_object::remove

。第二個程序的通路機制也很簡單:清單 4 證明了這一點。

清單 4. 從第二個程序通路共享記憶體對象
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <cstring>
#include <cstdlib>
#include <iostream>

int main(int argc, char *argv[ ])
{
      using namespace boost::interprocess; 
      try { 
      // opening an existing shared memory object 
      shared_memory_object sharedmem2 (open_only, "Hello", read_only);

      // map shared memory object in current address space
      mapped_region mmap (sharedmem2, read_only);

      // need to type-cast since get_address returns void* 
      char *str1 = static_cast<char*> (mmap.get_address());
      std::cout << str1 << std::endl;
      } catch (interprocess_exception& e) { 
          std::cout << e.what( ) << std::endl;
      } 
      return 0;
}      

在清單 4 中,使用 

open_only

 和 

read_only

 屬性建立共享記憶體對象。如果無法找到這個共享記憶體對象,就會抛出異常。現在,建構并運作 清單 3 和 清單 4 中的代碼。應該會在終端上看到 "Hello World!"。

接下來,在第二個程序的代碼(清單 4)中 

std::cout

 後面添加以下代碼并重新建構代碼:

// std::cout code here
shared_memory_object::remove("Hello");
// } catch(interprocess_exception& e) {      

連續執行代碼兩次,第二次執行會顯示 "No such file or directory",這證明共享記憶體已經被删除了。

回頁首

使用消息隊列實作程序間通信

現在,研究另一種流行的程序間通信機制:消息隊列。每個參與通信的程序都可以在隊列中添加消息和從隊列讀取消息。消息隊列具有以下性質:

  • 它有名稱,程序使用名稱通路它。
  • 在建立隊列時,使用者必須指定隊列的最大長度和一個消息的最大大小。
  • 隊列是持久的,這意味着當建立它的程序死亡之後它仍然留在記憶體中。可以通過顯式地調用

    boost::interprocess::message_queue::remove

     删除隊列。

在 清單 5 所示的代碼片段中,程序建立了一個可包含 20 個整數的消息隊列。

清單 5. 建立一個可包含 20 個整數的消息隊列
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream> 

int main(int argc, char* argv[ ]) 
{
    using namespace boost::interprocess;
    try { 
      // creating a message queue
      message_queue mq (create_only,   // only create
                                       "mq",              // name
                                        20,                 //max message count
                                        sizeof(int)      //max message size
                                        );
       // … more code follows
    } catch (interprocess_exception& e) { 
       std::cout << e.what( ) << std::endl;
    } 
}      

注意傳遞給 

message_queue

 的構造函數的 

create_only

 屬性。與共享記憶體對象相似,對于以隻讀方式打開消息隊列,應該把 

open_only

 屬性傳遞給構造函數。

發送和接收資料

在發送方,使用隊列的 

send

 方法添加資料。

send

 方法有三個輸入參數:原始資料的指針 (

void*

)、資料的大小和優先級。目前,以相同的優先級發送所有資料。清單 6 給出代碼。

清單 6. 向隊列發送消息
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream> 

int main(int argc, char* argv[ ]) 
{
    using namespace boost::interprocess;
    try { 
      // creating a message queue
      message_queue mq (create_only,   // only create
                                       "mq",              // name
                                        20,                 //max message count
                                        sizeof(int)      //max message size
                                        );
      // now send the messages to the queue
      for (int i=0; i<20; ++i) 
        mq.send(&i, sizeof(int), 0); // the 3rd argument is the priority 
    } catch (interprocess_exception& e) { 
        std::cout << e.what( ) << std::endl;
    } 
}      

在接收方,使用 

open_only

 屬性建立隊列。通過調用 

message_queue

 類的 

receive

 方法從隊列擷取消息。清單 7 給出 

receive

 的方法簽名。

清單 7. message_queue::receive 的方法簽名
void receive (void *buffer,           
                      std::size_t buffer_size, 
                      std::size_t &recvd_size,
                      unsigned int &priority
                     );      

我們來仔細看一下。第一個參數是從隊列接收的資料将被存儲到的位置。第二個參數是接收的資料的預期大小。第三個參數是接收的資料的實際大小。第四個參數是接收的消息的優先級。顯然,如果在執行程式期間第二個和第三個參數不相等,就是出現錯誤了。清單 8 給出接收者程序的代碼。

清單 8. 從消息隊列接收消息
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream> 

int main(int argc, char* argv[ ]) 
{
    using namespace boost::interprocess;
    try { 
      // opening the message queue whose name is mq
      message_queue mq (open_only,     // only open
                                       "mq"               // name
                                        );
      size_t recvd_size; 
      unsigned int priority; 

      // now send the messages to the queue
      for (int i=0; i<20; ++i) { 
        int buffer; 
        mq.receive ((void*) &buffer, sizeof(int), recvd_size, priority); 
        if (recvd_size != sizeof(int)) 
            ; // do the error handling
        std::cout << buffer << " " << recvd_size << " " << priority;
      } 
    } catch (interprocess_exception& e) { 
        std::cout << e.what( ) << std::endl;
    } 
}      

這相當簡單。注意,仍然沒有從記憶體中删除消息隊列;與共享記憶體對象一樣,這個隊列是持久的。要想删除隊列,應該在使用完隊列之後添加以下行:

message_queue::remove("mq"); // remove the queue using its name      

消息優先級

在發送方,做 清單 9 所示的修改。接收方代碼不需要修改。

清單 9. 修改消息的優先級
message_queue::remove("mq"); // remove the old queue
      message_queue mq (…); // create as before
      for (int i=0; i<20; ++i) 
        mq.send(&i, sizeof(int), i%2); // 第 3 個參數為消息的優先級
      // … rest as usual      

再次運作代碼時,應該會看到 清單 10 所示的輸出。

清單 10. 在接收程序中看到的輸出
1 4 1
3 4 1
5 4 1
7 4 1
9 4 1
11 4 1
13 4 1
15 4 1
17 4 1
19 4 1
0 4 0
2 4 0
4 4 0
6 4 0
8 4 0
10 4 0
12 4 0
14 4 0
16 4 0
18 4 0      

清單 10 證明,第二個程序優先接收優先級高的消息。

回頁首

同步對檔案的通路

共享記憶體和消息隊列很不錯,但是檔案 I/O 也是重要的程序間通信工具。對并發程序用于通信的檔案通路進行同步并非易事,但是 Boost IPC 庫提供的檔案鎖功能讓同步變得簡單了。在進一步解釋之前,來看一下 清單 11,了解 

file_lock

 對象是如何工作的。

清單 11. 使用 file_lock 對象同步檔案通路
#include <fstream> 
#include <iostream> 
#include <boost/interprocess/sync/file_lock.hpp> 
#include <cstdlib>

int main() 
{ 
    using namespace boost::interprocess; 
    std::string fileName("test"); 
    std::fstream file;

    file.open(fileName.c_str(), std::ios::out | std::ios::binary | 
std::ios::trunc); 
    if (!file.is_open() || file.bad()) 
    { 
        std::cout << "Open failed" << std::endl; 
        exit(-1); 
    }

    try { 
    file_lock f_lock(fileName.c_str());
    f_lock.lock();
    std::cout << "Locked in Process 1" << std::endl;
    file.write("Process 1", 9);
    file.flush(); 
    f_lock.unlock();
    std::cout << "Unlocked from Process 1" << std::endl;
    } catch (interprocess_exception& e) { 
    std::cout << e.what( ) << std::endl;
    }

    file.close();
    return 0; 
}      

代碼首先打開一個檔案,然後使用 

file_lock

 鎖定它。寫操作完成之後,它重新整理檔案緩沖區并解除檔案鎖。使用 

lock

 方法獲得對檔案的獨占通路。如果另一個程序也試圖對此檔案進行寫操作并已經請求了鎖,那麼它會等待,直到第一個程序使用 

unlock

 自願地放棄鎖。

file_lock

類的構造函數接受要鎖定的檔案的名稱,一定要在調用 

lock

 之前打開檔案;否則會抛出異常。

現在,複制 清單 11 中的代碼并做一些修改。具體地說,讓第二個程序請求這個鎖。清單 12 給出相關修改。

清單 12. 試圖通路檔案的第二個程序的代碼
// .. as in Listing 11
    file_lock f_lock(fileName.c_str());
    f_lock.lock();
    std::cout << "Locked in Process 2" << std::endl;
    system("sleep 4"); 
    file.write("Process 2", 9);
    file.flush(); 
    f_lock.unlock();
    std::cout << "Unlocked from Process 2" << std::endl;
    // file.close();      

現在,如果這兩個程序同時運作,有 50% 的機會看到第一個程序等待 4 秒後才獲得 

file_lock

,其他情況都不變。

在使用 

file_lock

 時,必須記住幾點。這裡讨論的主題是程序間通信,重點在程序 上。這意味着,不是使用 

file_lock

 來同步同一程序中各個線程的資料通路。在與 POSIX 相容的系統上,檔案句柄是程序屬性,而不是 線程屬性。下面是使用檔案鎖的幾條規則:

  • 對于每個程序,每個檔案使用一個 

    file_lock

     對象。
  • 使用相同的線程來鎖定和解鎖檔案。
  • 在解鎖檔案之前,通過調用 

    C

     的 

    flush

     庫例程或 

    flush

     方法(如果喜歡使用 

    C++ fstream

     的話),重新整理寫入者程序中的資料。

結合使用 file_lock 和有範圍(scope)的鎖

在執行程式時,可能會出現抛出異常而檔案沒有解鎖的情況。這種情況可能會導緻意外的程式行為。為了避免這種情況,可以考慮把

file_lock

 對象放在(boost/interprocess/sync/scoped_lock.hpp 中定義的)

scoped_lock

 中。如果使用 

scoped_lock

,就不需要顯式地鎖定或解鎖檔案;鎖定發生在構造器内,每當您退出該範圍,就會自動發生解鎖。清單 13 給出對 清單 11 的修改,使之使用有範圍的鎖。

清單 13. 結合使用 scoped_lock 和 file_lock
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/sync/file_lock.hpp>

//… code as in Listing 11
file_lock f_lock(fileName.c_str());
scoped_lock<file_lock> s_lock(f_lock);  // internally calls f_lock.lock( ); 

// No need to call explicit lock anymore
std::cout << "Locked in Process 1" << std::endl;
file.write("Process 1", 9);
// … code as in Listing 11      

注意:關于 Resource Acquisition Is Initialization (RAII) 程式設計習慣法的更多資訊,參見 參考資料 中的連結。

回頁首

了解 Boost MPI

如果您不熟悉 Message Passing Interface,那麼在讨論 Boost MPI 之前,應該先浏覽 參考資料 中提供的 MPI 參考資料。MPI 是一個容易使用的标準,它采用通過傳遞消息實作程序間通信的模型。不需要使用套接字或其他通信原語;MPI 後端管理所有底層處理。那麼,使用 Boost MPI 有什麼好處?Boost MPI 的建立者提供了更高層的抽象,并在 MPI 提供的 API 之上建構了一套簡單的例程,比如 

MPI_Init

 和 

MPI_Bcast

Boost MPI 不是一個單獨的庫,不能在下載下傳和建構之後直接使用。相反,必須安裝任意 MPI 實作(比如 MPICH 或 Open MPI)并建構 Boost Serialization 庫。關于如何建構 Boost MPI 的詳細資訊參見 參考資料。通常,使用以下指令建構 Boost MPI:

bash-4.1$ bjam –with-mpi      

Windows® 使用者可以從 BoostPro 下載下傳預先建構的 MPI 庫(見 參考資料)。這些庫與 Microsoft® HPC Pack 2008 和 2008 R2 相容(見 參考資料),适用于帶 Service Pack 3 的 Windows XP 或更高版本的客戶機操作環境。

回頁首

用 MPI 實作 Hello World 程式

您必須了解 Boost MPI 庫中的兩個主要類:

environment

 類和 

communicator

 類。前者負責分布式環境的初始化;後者用于程序之間的通信。因為這裡讨論的是分布式計算,我們有四個程序,它們都在終端上輸出 "Hello World"。清單 14 給出代碼。

清單 14. 使用 Boost MPI 的 Hello World
#include <boost/mpi.hpp>
#include <iostream>

int main(int argc, char* argv[])
{
  boost::mpi::environment env(argc, argv);
  boost::mpi::communicator world;

  std::cout << argc << std::endl;
  std::cout << argv[0] << std::endl;
  std::cout << "Hello World! from process " << world.rank() << std::endl;

  return 0;
}      

現在,建構 清單 14 中的代碼并連結 Boost MPI 和 Serialization 庫。在 shell 提示上運作可執行程式。應該會看到 "Hello World! from process 0"。接下來,使用 MPI 分派器工具(例如,對于 Open MPI 使用者,使用 

mpirun

;對于 Microsoft HPC Pack 2008,使用 

mpiexec

)并運作可執行程式:

mpirun –np 4 <executable name> 

OR

mpiexec –n 4 <executable name>      

現在應該會看到與 清單 15 相似的輸出,其中的 mympi1 是可執行程式名稱。

清單 15. 運作 MPI 代碼的輸出
1
mympi1
Hello, World! from process 3
1
mympi1
1
mympi1
Hello, World! from process 1
Hello, World! from process 2
1
mympi1
Hello, World! from process 0      

在 MPI 架構中,已經建立了相同程序的四個拷貝。在 MPI 環境中,每個程序有惟一的 ID(由 

communicator

 對象決定)。現在,試試程序間通信。使用 

send

 和 

receive

 函數調用讓一個程序與另一個程序通信。發送消息的程序稱為主程序,接收消息的程序稱為工作者程序。主程序和接收者程序的源代碼是相同的,使用 

world

 對象提供的等級決定功能(見 清單 16)。

清單 16. 互相通信的程序 0、1 和 2 的代碼
#include <boost/mpi.hpp>
#include <iostream>

int main(int argc, char* argv[]) 
{
  boost::mpi::environment env(argc, argv);
  boost::mpi::communicator world;

  if (world.rank() == 0) {
    world.send(1, 9, 32);
    world.send(2, 9, 33);
  } else { 
    int data;
    world.recv(0, 9, data);
    std::cout << "In process " << world.rank( ) << "with data " << data
                   << std::endl;
  } 
  return 0;
}      

先看一下 

send

 函數。第一個參數是接收者程序的 ID;第二個是消息資料的 ID;第三個是實際資料。為什麼需要消息标簽?接收者程序在執行期間的特定點上可能希望處理具有特定标簽的消息,是以這個方案會有幫助。對于程序 1 和 2,

recv

 函數被阻塞,這意味着程式會等待,直到從程序 0 收到标簽 ID 為 9 的消息。當收到這個消息時,把資訊存儲在 

data

 中。下面是運作代碼的輸出:

In process 1 with data 32
In process 2 with data 33      

如果在接收方有 

world.recv(0, 1, data);

 這樣的代碼,會發生什麼?代碼阻塞,但實際上是,接收者程序在等待一個永遠不會到達的消息。

回頁首

結束語

本文隻讨論了這兩個庫提供的功能的很小一部分。這些庫提供的其他功能包括 IPC 的記憶體映射 I/O 和 MPI 的廣播功能。從易用性的角度來說,IPC 更好。MPI 庫依賴于原生的 MPI 實作,而原生 MPI 庫以及預先建構的 Boost MPI 和 Serialization 庫的現成可用性仍然是個問題。但是,花點兒精力建構 MPI 實作和 Boost 的源代碼是值得的。

參考資料

學習

  • 了解關于 程序間通信 的更多資訊。
  • 了解如何 建構 Boost MPI。
  • 了解所有 

    mpirun

     選項。
  • 了解關于 RAII 習慣法 的更多資訊。
  • 閱讀 Message Passing Interface 标準。
  • 了解關于 Microsoft HPC Pack 2008/R2 SDK 的更多資訊。
  • AIX and UNIX 專區:developerWorks 的“AIX and UNIX 專區”提供了大量與 AIX 系統管理的所有方面相關的資訊,您可以利用它們來擴充自己的 UNIX 技能。
  • AIX and UNIX 新手入門:通路“AIX and UNIX 新手入門”頁面可了解更多關于 AIX 和 UNIX 的内容。
  • AIX and UNIX 專題彙總:AIX and UNIX 專區已經為您推出了很多的技術專題,為您總結了很多熱門的知識點。我們在後面還會繼續推出很多相關的熱門專題給您,為了友善您的通路,我們在這裡為您把本專區的所有專題進行彙總,讓您更友善的找到您需要的内容。
  • AIX and UNIX 下載下傳中心:在這裡你可以下載下傳到可以運作在 AIX 或者是 UNIX 系統上的 IBM 伺服器軟體以及工具,讓您可以提前免費試用他們的強大功能。
  • IBM Systems Magazine for AIX 中文版:本雜志的内容更加關注于趨勢和企業級架構應用方面的内容,同時對于新興的技術、産品、應用方式等也有很深入的探讨。IBM Systems Magazine 的内容都是由十分資深的業内人士撰寫的,包括 IBM 的合作夥伴、IBM 的主機工程師以及進階管理人員。是以,從這些内容中,您可以了解到更高層次的應用理念,讓您在選擇和應用 IBM 系統時有一個更好的認識。
  • 技術書店:浏覽關于這些和其他技術主題的圖書。

獲得産品和技術

  • 進一步了解并下載下傳 Boost Thread 庫。
  • 進一步了解并下載下傳 Boost MPI 庫。
  • 從 BoostPro 下載下傳預先建構的 MPI 庫。
  • 下載下傳 Boost IPC 庫。
  • 通路 MPICH2 下載下傳站點。
  • 通路 Open MPI v1.4 下載下傳站點。

讨論

  • developerWorks 部落格:閱讀我們的部落格并加入 developerWorks 社群。
  • 參與 AIX 和 UNIX 論壇:
    • AIX 5L — 技術論壇
    • AIX for Developers 論壇
    • 叢集系統管理
    • IBM Support Assistant
    • 性能工具 — 技術
    • 更多 AIX 和 UNIX 論壇