天天看点

61-System V 信号量(生产者与消费者模型)

学过 OS 的同学一定知道这个模型,这里再简单回忆一下。最后给出基于 System V 信号量的完整实现。

1. PV 原语

  • P(S) :表示将资源 S 数量减 1,即 S = S - 1. 如果 S <= 0,该进程进入等待。
  • V(S):表示将资源 S 数量加 1,即 S = S + 1. 如果 S > 0,该进程继续执行,否则释放一个正在等待的进程。

PV 操作是原子的,原子操作可以理解为一次性执行完若干条机器指令,中间不能打断。如果一个操作不是原子的,就可能会产生竞态错误。

2. 生产者与消费者模型

2.1 问题描述

假设有一个缓冲区,该缓冲区最多只能存放 5 个蛋糕。有若干生产者(producer),每个生产者一次生产一个蛋糕,如果发现缓冲区有空位,就将蛋糕放进去。另外还有若干消费者(consumer),每个消费者如果发现缓冲区有蛋糕,就拿走一个蛋糕。

61-System V 信号量(生产者与消费者模型)

图1 3 个生产者和 4 个消费者

2.2 初步解决

要完成这个功能,直观上我们会写出下面的代码:

  • 生产者进程
while(1) {
  // 为了方便演示,这里只用另一个共享内存中的变量来统计缓冲区中蛋糕的个数。
  if (cake < 5) { 
    cake++;
  }
}      
  • 消费者进程
while(1) {
  if (cake > 0) {
    cake--;      

如果程序真的就这样写,会有什么问题?

假设某个时刻 ​

​cake == 1​

​​,有一个消费者进程 C1 执行到 ​

​if (cake > 0)​

​进入 if 语句块后,进程由于时间片耗尽被切到到另一个消费者进程 C2,此时 cake 仍然等于 1,C2 同样也进入了 if 语句块。此时面临的问题是,C1 和 C2 都拿走一块蛋糕,结果 cake 被减 2 次,变成了 -1. 这种情况是不允许出现的。

2.3 改进版本一

这里的 cake 明显是竞争资源,在任意一个时刻,只能有一个进程去操作它。上面的代码可以改进为:

  • 生产者进程
while(1) {
  P(MUTEX);
  if (cake < 5) { 
    cake++;
  }
  V(MUTEX);
}      
  • 消费者进程
while(1) {
  P(MUTEX);
  if (cake > 0) {
    cake--;      

上面改进的代码里引入了信号量 MUTEX,用来表示资源 cake 是否被占用,MUTEX 的初始值为 1. 上面的程序就可以达到对 cake 互斥访问的目的了。

如此程序就没什么问题了,可是,还有一些细节上的问题?假如说蛋糕数为 0 了,操作系统仍然会调度消费者进程!那么问题来了,消费者进程还有必要调度?这不是对 CPU 的浪费吗?假如只有一个两个消费者进程还好说,如果有几十个,上百个这样的进程呢?

2.4 改进版本二

我们希望做到,如果蛋糕数为 0,消费者进程就去睡觉吧!别再被操作系统调度了,什么时候有蛋糕了,再唤醒它们!

这里再次引入信号量 FULL 和 EMPTY。FULL 表示蛋糕的个数,初始值为 0. EMPTY 表示空缓冲区的个数,初始值为 5.

改进前面的程序:

  • 生产者进程
while(1) {
  P(EMPTY); // 减少一个空缓冲区个数
  P(MUTEX);
  if (cake < 5) { 
    cake++;
  }
  V(MUTEX);
  V(FULL); // 增加一个蛋糕个数      
  • 消费者进程
while(1) {
  P(FULL); // 减少一个蛋糕个数
  P(MUTEX);
  if (cake > 0) {
    cake--;
  }
  V(MUTEX);
  V(EMPTY); // 增加一个空缓冲区个数      

如果用来描述蛋糕个数的信号量 FULL <= 0 了,消费者执行到 P(FULL) 就会立即被投入等待状态,不再被 OS 调度。什么时候 FULL > 0 了,才可能会被 OS 调度。

3. 基于 System V 信号量的实现

3.1 PV 操作封装

  • 头文件 semutil.h
// semutil.h
#ifndef __SEMUTIL_H__
#define __SEMUTIL_H__

#include <unistd.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <stdio.h>
#include <stdlib.h>

#define ASSERT(prompt,res) if((res)<0){perror(#prompt);exit(-1);}

/* 
 * Create
 * 创建和获取信号量 ipc 内核对象 id
 * count > 0 表示创建,count = 0 表示获取
 */
int C(int count);

/*
 * Set
 * 初始化第 semnum 个信号量的值为 val
 */
void S(int id, int semnum, int val);

/*
 * Get
 * 获取第 semnum 个信号量的值
 */
int G(int id, int semnum);

/*
 * Delete
 * 删除信号量内核对象
 */
void D(int id);

/*
 * 请求第 semnum 个信号量,将其值减 1
 */
void P(int id, int semnum); 

/*
 * 归还第 semnum 个信号量,将其值加 1
 */
void V(int id, int semnum);

#endif //__SEMUTIL_H__      
  • 实现文件 semutil.c
// semutil.c
#include "semutil.h"

int C(int count) {
  int id; 
  if (count > 0)  
    id = semget(0x8888, count, IPC_CREAT | IPC_EXCL | 0664);
  else
    id = semget(0x8888, 0, 0); 
  ASSERT(semget, id);
  return id; 
}

void S(int id, int semnum, int val) {
  ASSERT(semctl, semctl(id, semnum, SETVAL, val));
}

void D(int id) {
  ASSERT(semctl, semctl(id, 0, IPC_RMID));
}

void P(int id, int semnum) {
  struct sembuf op; 
  op.sem_num = semnum;
  op.sem_op = -1; 
  op.sem_flg = 0;
  ASSERT(semop, semop(id, &op, 1));
}

void V(int id, int semnum) {
  struct sembuf op; 
  op.sem_num = semnum;
  op.sem_op = 1;
  op.sem_flg = 0;
  ASSERT(semop, semop(id, &op, 1));
}

int G(int id, int semnum) {
  return semctl(id, semnum, GETVAL);
}      

3.2 生产者与消费者程序

这个程序把生产者消费者写在一套代码里,运行的时候依参数来控制具体是运行生产者还是消费者。具体说明如下:

  • ​./pc -b​

    ​ : 初始化 ipc 内核对象
  • ​./pc -d​

    ​ : 删除 ipc 内核对象
  • ​./pc -p​

    ​ : 启动生产者进程
  • ​./pc -c​

    ​ : 启动消费者进程
  • 程序代码:
// pc.c
#include "semutil.h"
#include <string.h>
#include <sys/shm.h>

#define MUTEX 0
#define FULL 1
#define EMPTY 2

static void init() {
  int id = C(3);
  S(id, MUTEX, 1); 
  S(id, FULL, 0); 
  S(id, EMPTY, 5); 
  int shmid = shmget(0x8888, 4, IPC_CREAT | IPC_EXCL | 0664);
  ASSERT(shmget, shmid);
  int *cake= shmat(shmid, NULL, 0); 
  if (cake == (int*)-1) ASSERT(shmat, -1); 
  *cake = 0;
  ASSERT(shmdt, shmdt(cake));
}

static int getsemid() {
  return C(0);
}

static int getshmid() {
  int id = shmget(0x8888, 0, 0); 
  ASSERT(shmget, id);
  return id; 
}

static void release(int id) {
  D(id);
  ASSERT(shmctl, shmctl(getshmid(), IPC_RMID, NULL));
}

static void producer() {
  int id = getsemid();
  int shmid = getshmid();
  int *cake = shmat(shmid, NULL, 0); 
  while(1) {
    P(id, EMPTY);
    P(id, MUTEX);
    printf("current cake = %d, ", *cake);
    (*cake)++;
    printf("produce a cake, ");
    printf("cake = %d\n", *cake);
    V(id, MUTEX);
    V(id, FULL);
    sleep(1);
  }
  shmdt(cake);
}
static void consumer() {
  int id = getsemid();
  int shmid = getshmid();
  int *cake = shmat(shmid, NULL, 0);
  int count = 10;
  while(count--) {
    P(id, FULL);
    P(id, MUTEX);
    printf("current cake = %d, ", *cake);
    (*cake)--;
    printf("consume a cake, ");
    printf("cake = %d\n", *cake);
    V(id, MUTEX);
    V(id, EMPTY);
  }
  shmdt(cake);
}

int main(int argc, char *argv[]) {
  if (argc < 2) {
    printf("usage: %s <option -b -d -p -c>\n", argv[0]);
    return -1;
  }

  if (!strcmp("-b", argv[1])) {
    init();
  }
  else if (!strcmp("-d", argv[1])) {
    release(getsemid());
  }
  else if (!strcmp("-p", argv[1])) {
    producer();
  }
  else if (!strcmp("-c", argv[1])) {
    consumer();
  }
  return 0;
}      
  • 编译
$ gcc pc.c semuti.c      
  • 运行结果
61-System V 信号量(生产者与消费者模型)

图2 生产者与消费者

4. 总结

  • 掌握 PV 原语的概念
  • 理解生产者与消费者模型
  • 完成基于 System V 信号量的实现
  • 生产者进程
while(1) {
  P(MUTEX);
  P(EMPTY); // 减少一个空缓冲区个数
  if (cake < 5) { 
    cake++;
  }
  V(FULL); // 增加一个蛋糕个数
  V(MUTEX);  
}      
  • 消费者进程
while(1) {
  P(MUTEX);
  P(FULL); // 减少一个蛋糕个数
  if (cake > 0) {
    cake--;
  }
  V(EMPTY); // 增加一个空缓冲区个数
  V(MUTEX);
}      

继续阅读