天天看點

[ Linux ] 程序間通信介紹 管道

0.程序間通信介紹

0.1通信背景

在之前我們學習程序時知道程序具有獨立性,是以程序間互動資料的成本就變得非常高。程序之間為什麼也進行程序間通信,這就需要談談程序間通信的目的了。但是程序具有獨立性不是徹底獨立,隻管自己不理任何人,當然不是這樣的。程序之間也是存在通信的。那麼程序間通信的方式也有很多種,包括管道,System V IPC,POSIX IPC........那麼今天我們先來看看程序間如何通過管道來進行通信。

0.2程序間通信目的

  • 資料傳輸:一個程序需要将它的資料發送給另一個程序。
  • 資源共享:多個程序之間共享同樣的資源。
  • 通知事件:一個程序需要向另一個或一組程序發送消息,通知它(它們)發生了某種時間(如程序終止時要通知父程序)。
  • 程序控制:有些程序希望完全控制另一個程序的執行(如Debug程序),此時控制程序希望能夠攔截另一個程序的所有陷入和異常,并能夠及時知道它的狀态改變。

1.管道

1.1 管道是什麼

管道是Unix中最古老的程序間通信的形式。我們把從一個程序連接配接到另一個程序的一個資料流稱為一個“管道”。在Linux中管道分兩種:匿名管道和命名管道。

[ Linux ] 程式間通信介紹 管道

1.2 匿名管道

假設現在在記憶體中有兩個獨立的程序,如果要想這兩個程序之間通信,那麼程序1可以先把資料拷貝到磁盤上,然後程序2再去讀這個資料【如下圖所示】....首先不考慮這種方法是否可行,但是比較好的讓我們了解了一個道理:程序在通信之前,必須讓不同的程序能夠看到同一份資源(檔案,記憶體塊.....)

[ Linux ] 程式間通信介紹 管道

是以在通信之前,如何解決讓程序先看到同一份資源呢?并且資源的不同決定了不同種類的通信方式!!!

是以我們正在學習的管道是提供共享資源的一種手段。

我們知道檔案在記憶體和磁盤之間來回切換是非常耗費時間的,是以程序間通信大多都是記憶體級别的。意思就是在記憶體内部重建一塊區域進行通信。

[ Linux ] 程式間通信介紹 管道

那麼什麼是管道呢?

在計算機通信中,我們把檔案不再是一個磁盤檔案,通過特定的接口表征自己的身份,說明他和磁盤脫離,自己讀寫資料是就在檔案的記憶體緩沖區,完成資料互動,我們把這個檔案叫做管道。是以我們說Linux下一切皆檔案,管道也是檔案。管道就是一個記憶體級檔案。内容不需要重新整理到磁盤中。

1.2.1管道通信的特點

在我們生活中遇到的管道有什麼特點呢?那首先問那些都屬于管道呢?天然氣管道,水龍頭管道等等.....那麼這些管道大多數情況下都是單向的,并且這些管道都是傳輸資源的,在計算機中最重要的資源就是資料。

  1. 單向的
  2. 傳輸資料

那麼我們如何來保證單向性呢?

我們來看下圖,父程序和子程序通過管道完成程序通信如何保證單向性呢,我們剛剛提到了管道是一個檔案,是資料的緩沖區,是以當父程序把需要通信的資料通過寫的方式寫入管道内時,子程序通過讀的方式拿到這些資源即可完成父子間的通信。為了保證單向的,我們需要關閉父程序的讀端,讓父程序隻能寫,關閉子程序的寫段,讓子程序隻能讀。通過這樣的方式我們就可以保證父程序隻能寫資料子程序隻能讀資料的單向性。

父程序必須以讀寫方式打開,這是因為子程序會繼承下去,這樣子程序就不用再打開了。那麼誰決定父子關閉什麼讀寫?這不是由管道決定的,這是由我們的需求所決定的。

[ Linux ] 程式間通信介紹 管道

那麼我們如何來打開管道呢?難道要調用兩次open()嗎?當然不是了,是以作業系統提供了pipe()接口

1.2.2 匿名管道編碼

認識pipe()接口,當我們調用piep時,底層會自動幫助我們把檔案以讀方式和寫方式打開,而且我們會的到兩個檔案描述符,這兩個檔案描述符會寫進pipefd數組内,是以這個數組是一個輸出型參數。并且pipe是一個系統調用。傳回0表示成功,傳回-1表示失敗。

[ Linux ] 程式間通信介紹 管道

接下來我們進行管道的代碼:

下面這段代碼是管道的建立和驗證輸出的數組是否使我們所想的兩個檔案描述符

#include <iostream>
#include <cstdio>
#include <unistd.h>

using namespace std;

int main()
{
    int pipefd[2] = {0};
    if(pipe(pipefd) != 0)
    {
        cerr<<"pipe error"<<endl;
        return 1;
    }
    cout<<"fd[0]:"<<pipefd[0]<<endl;
    cout<<"fd[1]:"<<pipefd[1]<<endl;

    return 0;
}      
[ Linux ] 程式間通信介紹 管道

匿名管道代碼示範

#include <iostream>
#include <cstdio>
#include <cstring>
#include <sys/wait.h>
#include <sys/types.h>
#include <unistd.h>

using namespace std;

//示範pipe管道通信的基本過程 -- 匿名管道
int main()
{
    //1.建立管道
    int pipefd[2] = {0};
    if(pipe(pipefd) != 0)
    {
        cerr<<"pipe error"<<endl;
        return 1;
    }

    //2.建立子程序
    pid_t id = fork();
    if(id<0)
    {
        cerr<<"fork error"<<endl;
        return 2;
    }
    else if(id == 0)
    {   
        //child  來進行讀取
        close(pipefd[1]);
        char buffer[1024];
        while(true)
        {
            memset(buffer,0,sizeof(buffer));
            ssize_t s = read(pipefd[0],buffer,sizeof(buffer) - 1 );
            if(s>0)
            {
                //讀取成功
                buffer[s] = '\0';
                cout<<"子程序收到消息,消息的内容是:"<< buffer <<endl;
            }
            else if(s == 0)
            {
                cout<<"父程序寫完了,我也退出了"<<endl;
                break;
            }
            else
            {
                //do nothing
            }
        }

        close(pipefd[0]); 
        exit(0);
    }
    else
    {
        //parent  來進行寫入
        close(pipefd[0]);
        string msg = "你好子程序,我是父程序!";
        int cnt = 0;
        while(cnt<5)
        {
            write(pipefd[1],msg.c_str(),msg.size());
            sleep(1);
            cnt++;
        }
        close(pipefd[1]);
        cout<<"父程序的工作結束了 退出"<<endl;
    }
    pid_t res =  waitpid(id,nullptr,0);
    if(res > 0 )
    {
        cout<<"父程序等待成功"<<endl;
    }
    //pipefd[0]  是讀
    //pipefd[1]  是寫

    // cout<<"fd[0]:"<<pipefd[0]<<endl;
    // cout<<"fd[1]:"<<pipefd[1]<<endl;
    return 0;
}      
[ Linux ] 程式間通信介紹 管道

至此父程序的資料就發給了子程序,子程序也能夠接受父程序發送來的資料。在上述代碼中,我們在父程序中帶了sleep(1),讓父程序每間1秒向管道内寫入一個資料,那麼子程序沒有帶sleep(1),為什麼子程序也會随之休眠一秒呢?為了更好的看到這個現象,我們在父子程序裡面帶上時間戳,寫寫日志。

[ Linux ] 程式間通信介紹 管道

我們通過測試觀察到子程序代碼沒有任何的休眠,子程序會随着父程序的節奏讀取,那麼我們可以得出結論:當父程序沒有寫入資料的時候,子程序在等!是以,父程序寫入之後,子程序才能read(會傳回)到資料,子程序列印讀取資料要以父程序的節奏為主!

那麼,父程序和子程序讀寫的時候是有一定的順序性的!當父程序向管道寫入的時候,子程序才可以讀!

  • 管道内部,沒有資料,reader就必須阻塞等待(read時等待)
  • 管道内部,如果資料寫滿,writer就必須阻塞等到(writer時等待)

是以pipe内部是自帶通路控制機制的以及存在同步和互斥機制的。

所謂的阻塞等待的本質是将目前的tast_struct 放入等待隊列中,将PCB的狀态由R->S/D/T

父程序控制子程序的行為

假設父程序想讓我的子程序做父程序想讓子程序做的行為,以及父程序想控制一批(多個)子程序.....該如何來寫呢???

我們先寫一段父程序控制子程序的行為的代碼

#include <iostream>
#include <vector>
#include <unordered_map>
#include <cstdio>
#include <cstring>
#include <cstdlib>
#include <ctime>
#include <sys/wait.h>
#include <sys/types.h>
#include <unistd.h>
#include <cassert>

using namespace std;

// 定義一個函數指針
typedef void (*functor)();

vector<functor> functors; //方法的集合

// for debug
unordered_map<uint32_t, string> info;

void f1() { cout << "這是一個處理日志的任務,執行的程序id: [" << getpid() << "]"
                 << "執行的時間是:[" << time(nullptr) << "]" << endl; }

void f2() { cout << "這是一個處理資料備份的任務,執行的程序id: [" << getpid() << "]"
                 << "執行的時間是:[" << time(nullptr) << "]" << endl; }

void f3() { cout << "這是一個處理網絡連接配接的任務,執行的程序id: [" << getpid() << "]"
                 << "執行的時間是:[" << time(nullptr) << "]" << endl; }

void loadFunctor()
{
    info.insert({functors.size(), "處理日志任務"});
    functors.push_back(f1);

    info.insert({functors.size(), "資料備份任務"});
    functors.push_back(f2);

    info.insert({functors.size(), "處理網絡連接配接任務"});
    functors.push_back(f3);
}

int main()
{
    // 0. 加在任務清單
    loadFunctor();
    // 1.建立管道
    int pipefd[2] = {0};
    if (pipe(pipefd) != 0)
    {
        cerr << "pipe error" << endl;
        return 1;
    }

    // 2.建立子程序
    pid_t id = fork();
    if (id < 0)
    {
        cerr << " fork error" << endl;
        return 2;
    }
    else if (id == 0)
    {
        // child read
        // 關閉不需要的fd
        close(pipefd[1]);

        while (true)
        {
            uint32_t operatorType = 0;

            //如果有資料就讀取 如果沒有資料就阻塞等待
            ssize_t s = read(pipefd[0], &operatorType, sizeof(uint32_t));
            if(s == 0) 
            {
                cout << "我是子程序,我要退出了" <<endl;
                break;
            }
            assert(s == sizeof(uint32_t));
            // assert斷言 是編譯有效的 debug模式
            //但是如果是release模式下 斷言就沒有了
            //一旦斷言沒有了,s變量就是隻被定義沒有被使用,一次你release模式下 可能有warning
            (void)s;

            if (operatorType < functors.size())
            {
                functors[operatorType]();
            }
            else
            {
                cerr << "bug? operatorTypr: " << operatorType << cout;
            }
        }

        close(pipefd[0]);
        exit(0);
    }
    else
    {
        srand((long long)time(nullptr));
        close(pipefd[0]);
        int num = functors.size();
        int cnt = 10;
        while (cnt--)
        {
            //形成任務碼
            uint32_t commandCode = rand() % num;
            cout<< "父程序指派任務完成,任務是: " <<info[commandCode] <<
            "任務的編号是:" << cnt << endl;
            //想指定程序下答操作的任務
            write(pipefd[1], &commandCode, sizeof(uint32_t));
            sleep(1);
        }
        close(pipefd[1]);
        pid_t res = waitpid(id, nullptr, 0);
        if (res)
            cout << " wait sucess  " << endl;
    }

    return 0;
}      
[ Linux ] 程式間通信介紹 管道

通過這份代碼父程序控制了子程序的行為,往後我們隻需要修改functors裡面的方法,就可以讓子程序執行指定的任務。

程序池 -- 池化概念

那麼如果是一個父程序想要控制一批子程序呢??? 父程序怎麼樣把一批任務交給子程序呢?

[ Linux ] 程式間通信介紹 管道

是以,我們有多少個程序,我們就建立多少個管道,父程序可以通過對指定管道寫入特定的任務,讓指定的子程序做對應的事情,這樣我們就引入了一個池化的概念!那麼我怎麼如何書寫對應的代碼呢?

int processNum  = 5;

int main()
{
    for(int i = 0;i<processNum;++i)
    {
        //定義儲存管道fd的對象
        int pipefd[2] = {0};
        //建立管道
        pipe(pipefd);
        pid_t id = fork();
        if(id == 0 )
        {
            //子程序執行
            exit(0);
        }

        //父程序做得事情
    }

    return 0;
}      

上面這份代碼就成功的将每一次建立的子程序和父程序都獨立的進行了控制,是以父程序在不斷的循環,給子程序指派任務的時候需要知道給哪一個程序指派,指派什麼任務,通過什麼指派呢? 是以我們接下來需要做的就是解決這些問題。是以我們需要一節pair結構

#include <iostream>
#include <vector>
#include <unordered_map>
#include <cstdio>
#include <cstring>
#include <cstdlib>
#include <ctime>
#include <sys/wait.h>
#include <sys/types.h>
#include <unistd.h>
#include <cassert>

using namespace std;

// 定義一個函數指針
typedef void (*functor)();

vector<functor> functors; //方法的集合

// for debug
unordered_map<uint32_t, string> info;

void f1() { cout << "這是一個處理日志的任務,執行的程序id: [" << getpid() << "]"
                 << "執行的時間是:[" << time(nullptr) << "]\n\n" << endl; }

void f2() { cout << "這是一個處理資料備份的任務,執行的程序id: [" << getpid() << "]"
                 << "執行的時間是:[" << time(nullptr) << "]\n\n" << endl; }

void f3() { cout << "這是一個處理網絡連接配接的任務,執行的程序id: [" << getpid() << "]"
                 << "執行的時間是:[" << time(nullptr) << "]\n\n" << endl; }

void loadFunctor()
{
    info.insert({functors.size(), "處理日志任務"});
    functors.push_back(f1);

    info.insert({functors.size(), "資料備份任務"});
    functors.push_back(f2);

    info.insert({functors.size(), "處理網絡連接配接任務"});
    functors.push_back(f3);
}

//第一個int32_t:程序pid
//第二個int32_t:該程序對應的管道寫端fd
typedef pair<int32_t, int32_t> elem;
int processNum = 5;

void work(int blockFd)
{
    cout << "程序 [" << getpid() << "] 開始工作" << endl;
    //進行
    while (true)
    {
        //阻塞等待 擷取任務資訊
        uint32_t operatorCode = 0;
        ssize_t s = read(blockFd, &operatorCode, sizeof(uint32_t));
        if (s == 0)
            break;
        assert(s == sizeof(uint32_t));
        (void)s;

        //處理任務
        if (operatorCode < functors.size())
        {
            functors[operatorCode]();
        }
    }
    cout << "程序 [" << getpid() << "] 結束工作" << endl;
}
// [子程序的pid,子程序的管道fd]
void sendTask(const vector<elem> & processFds)
{
    srand((long long)time(nullptr));
    while (true)
    {
        sleep(1);

        //選擇一個程序 選擇程序是随機的 沒有壓着一個程序給任務 -- 随機的
        //較為均勻的将任務給所有的子程序 --- 負載均衡
        uint32_t pick = rand() % processFds.size();

        // 選擇任務
        uint32_t task = rand() % functors.size();

        //把任務給一個指定的程序
        write(processFds[pick].second, &task, sizeof(task));

        //列印對應的提示資訊
        cout << "父程序指派任務 --> " << info[task] << "給程序:" << processFds[pick].first
             << "編号:" << pick << endl;
    }
}

int main()
{
    loadFunctor();
    vector<elem> assignMap;
    for (int i = 0; i < processNum; ++i)
    {
        //定義儲存管道fd的對象
        int pipefd[2] = {0};
        //建立管道
        pipe(pipefd);
        pid_t id = fork();
        if (id == 0)
        {
            //子程序讀取
            close(pipefd[1]);
            //子程序執行
            work(pipefd[0]);
            close(pipefd[0]);
            exit(0);
        }

        //父程序做得事情
        close(pipefd[0]);
        elem e(id, pipefd[1]);
        assignMap.push_back(e);
    }

    cout << "create all process success !" << endl;
    //父程序,派發任務
    sendTask(assignMap);

    // 回收資源
    for (int i = 0; i < processNum; ++i)
    {
        if (waitpid(assignMap[i].first, nullptr, 0) > 0)
            cout << "wait for : pid =  " << assignMap[i].first << " wait success"
                 << "number " << i << endl;
        close(assignMap[i].second);
    }
    return 0;
}      
[ Linux ] 程式間通信介紹 管道

我們發現此時父程序5個子程序随機的派發不同的任務,這就是一種程序池。至此匿名管道全部寫完。

其中,我們在shell指令行中寫的 | 就是匿名管道

1.3管道的特征總結

  1. 管道隻能用來進行具有血緣關系的程序之間,進行程序間通信,常用語父子通信。
  2. 管道隻能單向通信(由核心設計實作)半雙工的一種有特殊情況
  3. 管道自帶同步機制(pipe滿,write等;pipe空,read滿) -- 自帶通路控制
  4. 管道是面向位元組流的 -- 先寫的字元 一定是先被讀取的 沒有格式邊界 需要使用者來自定義區分内容的邊界 [sizeof(uint32_t)]
  5. 管道的生命周期随程序 -- 管道是檔案 -- 程序退出了 曾經打開的檔案也會退出
[ Linux ] 程式間通信介紹 管道

1.4命名管道

我們剛剛提到的都是父子間(血緣)通信,如果我們想要兩個毫不相幹的兩個程序之間通信,應該怎麼辦。是以我們接下來要講的就是命名管道。命名管道和之前的匿名管道最大的差別就是可以讓任意兩個程序之間通信。

1.4.1建立一個命名管道

  • 建立一個命名管道可以在指令上建立 使用如下這個指令

mkfifo filename

[ Linux ] 程式間通信介紹 管道

這個myfifo就是一個管道檔案,前面以p開頭。以p開頭就是管道,假如我們現在要在左側想管道内部寫入一些東西,在右側實時檢視,當我們左側回車按下時候,右側立馬出現了“aaaaaa”

[ Linux ] 程式間通信介紹 管道
[ Linux ] 程式間通信介紹 管道

但是這樣還是不能很好的觀察現象,我們在右側寫一個實時的檢視腳本,讓一直想管道檔案内部寫入 bbbbb

while :; do echo "bbbbb" ; sleep 1; done >> myfifo

[ Linux ] 程式間通信介紹 管道
[ Linux ] 程式間通信介紹 管道
  • 命名管道也可以在程式中建立,相關函數

int mkfifo(const char* filename,mode_t mode);

[ Linux ] 程式間通信介紹 管道
[ Linux ] 程式間通信介紹 管道

我們發現命名管道是帶路徑的,這有什麼作用呢? 其實命名管道是通過一個fifo檔案,由于這個檔案存在路徑,我們都知道路徑具有唯一性,是以通過路徑我們程序都可以看到這一份資源。

1.4.2 命名管道編碼

由于我們知道命名管道可以實作兩個不想關的程序完成通信,是以我們接下來将寫兩個檔案(程序),讓這兩個檔案進行通信。我們想讓clientFifo.cpp這個程序和severFifo.cpp這個程序通過命名管道通信,該怎麼寫呢?

[ Linux ] 程式間通信介紹 管道
[ Linux ] 程式間通信介紹 管道
#pragma once 
#include <iostream>
#include <cstdio>
#include <string>
#include <cstring>
#include <cerrno>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>


#define IPC_PATH "./.fifo"      
.PHONY:all
all: clientFifo severFifo

clientFifo:clientFifo.cpp
  g++ -Wall -o $@ $^ -std=c++11

severFifo:severFifo.cpp
  g++ -Wall -o $@ $^ -std=c++11
  
.PHONY:clean
clean:
  rm -f clientFifo severFifo .fifo      
//寫入
#include "comm.h"

using namespace std;

int main()
{
    int pipeFd = open(IPC_PATH,O_WRONLY);
    if(pipeFd < 0)
    {
        cerr<<"Open : " << strerror(errno) <<endl;
        return 1;
    }

#define NUM 1024
    char line[NUM];
    //通信
    while(true)
    {
        printf("請輸入你的消息# ");
        fflush(stdout);
        memset(line,0,sizeof(line));
        //從鍵盤按行為機關讀取
        if(fgets(line,sizeof(line),stdin) != nullptr )
        {
            line[strlen(line) - 1] = '\0';
            write(pipeFd,line,strlen(line));
        }
        else
        {
            break;
        }
    }

    close(pipeFd);
    cout<< "用戶端退出啦"<<endl;
    return 0;
}      
//讓severFifo 來讀取
#include "comm.h"

using namespace std;

int main()
{
    umask(0);
    if(mkfifo(IPC_PATH,0600) != 0)
    {
        cerr<<"mkfifo client" <<endl;
        return 1;
    }

    int pipeFd = open(IPC_PATH,O_RDONLY);
    if(pipeFd < 0)
    {
        cerr << "open fifo error" << endl;
        return 2;
    }
    //正常通信
    char buffer[1024];
    while(true)
    {
        ssize_t s = read(pipeFd,buffer,sizeof(buffer) - 1);
        if(s > 0 )
        {
            buffer[s] = '\0';
            cout<<" 用戶端-->伺服器#  " << buffer << endl;
        }
        else if(s == 0)
        {
            cout<< "用戶端退出了,我也退出了";
            break;
        }
        else
        {
            //do nothing
            cout << "read error" <<  strerror(errno) <<endl;
            break;
        }
    }

    close(pipeFd);
    cout<< "服務端退出啦"<<endl;
    unlink(IPC_PATH);
    return 0;
}      

繼續閱讀