简介
IPC(Inter Process Communication,进程间通信)的方式总共有三种,分别是信号量、共享内存和消息队列,本文介绍前两种。
在Linux中,进程之间操作公共数据时,需要进行互斥操作,这种情况需要临界区。在《Linux高性能服务器编程》这本书中,给出了
semget
、
semop
和
semctl
三个函数进行有关的操作。但是,Linux官方不建议使用这三个旧的函数了,而是建议使用新的API;上述三个函数是针对有名字类型的信号量进行操作,有些情况下,还需要对公共内存区进行操作,此时需要无名类型的信号量,同样的,使用官方建议的新版的API。
进程的信号量机制
PV原语操作
信号量机制和PV原语操作,是进程互斥的基础。PV原语操作是指任意时刻任何进程或者线程只能通过PV原语对信号量进行操作,操作是原子的,不能有其他进程中断当前进程的操作。
假设有信号量S,V操作是
S = S + 1
, P操作是
S = S - 1
;如果
S = 0
,V操作阻塞,等待其他进程唤醒。最典型的应用是生产者消费者模型,生产者进行P操作,消费者V操作。
PV原语对应Linux的系统调用
在Linux中,
sem_t
类型表示一个信号量,是一个特有的数据类型。
V操作对应的函数
#include <semaphore.h>
int sem_post(sem_t* sem);
需要添加
-pthread
连接。
sem
是信号量值,该函数使得
sem
加1,如果加1后
*sem > 0
而且有进程阻塞,那么会唤醒阻塞的进程。成功返回0,失败返回-1。
P操作对应的函数
#include <semaphore.h>
int sem_wait(sem_t* sem);
如果sem指向的信号量大于0,那么
*sem -= 1
,而且会立刻返回,如果当前
*sem == 0
,那么会阻塞,有可能使得信号量-1。
有名信号量和无名信号量区别与联系
两者都是信号量机制的,用于消息传递和互斥。
无名信号量只能存在于内存中,要求使用信号量的进程必须能访问信号量所在的这一块内存,所以无名信号量只能应用在同一进程内的线程之间(共享进程的内存),或者不同进程中已经映射相同内存内容到它们的地址空间中的线程(即信号量所在内存被通信的进程共享)。意思是说无名信号量只能通过共享内存访问。
有名信号量可以通过名字访问,因此可以被任何知道它们名字的进程中的线程使用。
单个进程中使用 POSIX 信号量时,无名信号量更简单。多个进程间使用 POSIX 信号量时,有名信号量更简单。
作者:ACool
链接:https://juejin.im/post/5b2908d2e51d4558e3600827
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
有名信号量实例
关于创建多个子进程,参考这篇博客:https://blog.csdn.net/qq_35976351/article/details/86584644
创建函数:
#include <fcntl.h> /* For O_* constants */
#include <sys/stat.h> /* For mode constants */
#include <semaphore.h>
sem_t *sem_open(const char *name, int oflag);
sem_t *sem_open(const char *name, int oflag,
mode_t mode, unsigned int value);
name
是信号量的名称,如果
oflag
是
O_CREAT
,而且是第一次创建,则必须使用第二个函数,
mode
表示去权限,
value
是信号量的初始值;如果同样的
oflag
,已经存在,则使用第一个。
代码实例,父子进程使用信号量同步,父进程不使用
wait
函数。
#include <stdio.h>
#include <unistd.h>
#include <semaphore.h>
#include <fcntl.h>
#include <sys/stat.h>
sem_t* sem;
const char* sem_name = "my_sem";
int main() {
sem = sem_open (sem_name, O_CREAT, 0660, 0);
int pid = fork();
if (pid < 0) {
perror ("fork() error\n");
return 1;
} else if (pid == 0) { // 子进程
sleep (3); // 续3秒
puts ("child");
sem_post (sem);
} else { // 父进程
sem_wait (sem);
puts ("parent");
sem_destroy(sem);
}
return 0;
}
无名信号量实例
无名信号的创建函数:
#include <semaphore.h>
int sem_init(sem_t* sem, int pshared, unsigned int value);
sem
是信号量地址。
pshared
是共享的标志,如果为0,表示一个进程内各个线程之间共享;不为零表示进程之间的共享。成功返回0,失败返回-1。
该函数需要和共享内存的进程共同使用,因为
sem
参数必须是在 共享内存上的。
进程间共享内存
这一部分指的是无名的信号量,即共享内存。
创建共享内存:
#include <sys/shm.h>
int shmget(key_t key, size_t size, int shmflg);
-
:一个键值,表示全局唯一的共享内存key
-
:共享内存的大小,单位是字节size
-
:一个标记,一般设置为shmflg
,具体查手册。这里说明两个特殊的:O_CREAT
-
:系统使用大页面为共享内存分配空间SHM_HUGETLB
-
:不为共享内存保留交换分区SHM_NORESERVE
-
函数成功返回正整数值,是共享内存的标识符;失败返回-1。
共享内存创建完毕,需要先关联到进程的地址空间中;使用完毕后,也需要将它从进程地址空间中分离。调用如下两个函数:
#include <sys/shm.h>
void* shmat(int shm_id, const void* shm_addr, int shmflg); // 关联函数
int shmdt(const void* shm_addr); // 分离函数
-
:由shm_id
返回的共享内存标识符shmget
-
:如果是shm_addr
,关联的地址由操作系统决定,推荐这么做;非空的情况比较复杂,查找手册即可。NULL
系统使用
shmctl
操作共享内存的属性,具体参考手册。
#include <sys/shm.h>
int shmctl(int shm_id, int command, struct shmid_ds* buf);
共享内存的POSIX方法:
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
int shm_open(const char* name, int oflag, mode_t mode);
int shm_unlink(const char* name);
类似于
mmap
方法,不过这里是具名共享内存。
name
是共享内存的名称,
oflag
是创建方式,具体参照手册,成功返回文件描述符,失败返回-1。第二个是取消连接,
name
同上。
共享内存的多进程聊天室代码实例:
#include <sys/shm.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#define USER_LIMIT 32
#define BUFFER_SIZE 1024
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define PROCESS_LIMIT 65536
struct client_data {
sockaddr_in address; // 客户端地址
int connfd; // socket文件描述符
pid_t pid; // 客户端进程的pid
int pipefd[2]; // 和父进程通信的管道
};
static const char* shm_name = "my_shm";
int sig_pipefd[2];
int epollfd = -1;
int listenfd = -1;
int shmfd = -1;
char* share_mem = 0;
client_data* users = 0; // 客户连接数据,进程客户连接的来索引数据
int* sub_process; // 子进程和客户连接的映射关系表,用进程的pid缩影这个数组
int user_count = 0; // 当前客户的数量
bool stop_child = false;
int setnonblocking (int fd) {
int old_option = fcntl (fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
assert (fcntl (fd, F_SETFL, new_option) != -1);
return old_option;
}
void addfd (int epollfd, int fd) {
epoll_event event;
bzero (&event, sizeof (event) );
event.data.fd = fd;
event.events |= EPOLLIN | EPOLLET;
assert (epoll_ctl (epollfd, EPOLL_CTL_ADD, fd, &event) != -1);
setnonblocking (fd);
}
void sig_handler (int sig) {
int save_errno = errno;
int msg = sig;
send (sig_pipefd[1], (char*) &msg, sizeof (char), 0);
errno = save_errno;
}
void addsig (int sig, void (*handler) (int), bool restart = true) {
struct sigaction sa;
bzero (&sa, sizeof (sa) );
sa.sa_handler = handler;
if (restart) {
sa.sa_flags |= SA_RESTART;
}
sigfillset (&sa.sa_mask);
assert (sigaction (sig, &sa, NULL) != -1);
}
void del_resource() {
close (sig_pipefd[0]);
close (sig_pipefd[1]);
close (listenfd);
close (epollfd);
shm_unlink (shm_name);
delete[] users;
delete[] sub_process;
}
// 停止一个子进程
void child_term_handler (int sig) {
stop_child = true;
}
// 子进程运行的函数,idx指出子进程处理的客户连接的编号,
// users保存客户连接数据的数组
// share_mem支持共享内存的起始地址
int run_child (int idx, client_data* users, char* share_mem) {
epoll_event events[MAX_EVENT_NUMBER];
memset (events, 0, sizeof (events) );
// 子进程IO复用同时监听两个文件描述符,
// 分别是客户连接socket和与父进程通信的管道文件描述符
int child_epollfd = epoll_create1 (0);
assert (child_epollfd != -1);
int connfd = users[idx].connfd;
int pipefd = users[idx].pipefd[1];
addfd (child_epollfd, pipefd);
int ret;
addsig (SIGTERM, child_term_handler, false);
while (!stop_child) {
int number = epoll_wait (child_epollfd, events, MAX_EVENT_NUMBER, -1);
if (number < 0) {
perror ("epoll failure\n");
break;
}
for (int i = 0; i < number; ++i) {
auto sockfd = events[i].data.fd;
auto event = events[i].events;
if (sockfd == connfd && event & EPOLLIN) {
memset (share_mem + idx * BUFFER_SIZE, 0, BUFFER_SIZE);
ret = recv (connfd, share_mem + idx * BUFFER_SIZE, BUFFER_SIZE - 1, 0);
if (ret < 0) {
if (errno != EINTR) {
stop_child = true;
}
} else if (ret == 0) {
stop_child = true;
} else {
// 通知主进程处理数据
send (pipefd, (char*) &idx, sizeof (idx), 0);
}
} else if (sockfd == pipefd && event & EPOLLIN) {
int client = 0;
ret = recv (sockfd, (char*) &client, sizeof (client), 0);
if (ret < 0) {
if (errno != EAGAIN) {
stop_child = true;
}
} else if (ret == 0) {
stop_child = true;
} else {
send (connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0);
}
} else {
continue;
}
}
}
close (connfd);
close (pipefd);
close (child_epollfd);
return 0;
}
int main (int argc, char* argv[]) {
if (argc != 2) {
perror ("Usage: %s <port> of server\n");
return 1;
}
int port = atoi (argv[1]);
if (port < 1024 || port > 65535) {
perror ("port error\n");
return 1;
}
struct sockaddr_in address;
bzero (&address, sizeof (address) );
address.sin_family = AF_INET;
address.sin_port = htons (port);
address.sin_addr.s_addr = htonl (INADDR_ANY);
listenfd = socket (AF_INET, SOCK_STREAM, 0);
if (listenfd < 0) {
perror ("socket() error\n");
return 1;
}
setnonblocking (listenfd); // 非阻塞监听
int ret = bind (listenfd, (struct sockaddr*) &address, sizeof (address) );
if (ret < 0) {
perror ("bind() error\n");
close (listenfd);
return 1;
}
ret = listen (listenfd, 32);
if (ret < 0) {
perror ("listen() error\n");
return 1;
}
user_count = 0;
users = new client_data[USER_LIMIT + 1];
sub_process = new int[PROCESS_LIMIT];
memset (users, -1, USER_LIMIT * sizeof (int) );
memset (sub_process, -1, PROCESS_LIMIT * sizeof (int) );
epoll_event events[MAX_EVENT_NUMBER];
memset (events, 0, sizeof (events) );
epollfd = epoll_create1 (0);
if (epollfd < 0) {
perror ("epoll_create1() error\n");
close (listenfd);
return 1;
}
addfd (epollfd, listenfd);
ret = socketpair (AF_UNIX, SOCK_STREAM, 0, sig_pipefd);
if (ret < 0) {
perror ("sockerpair() error\n");
close (epollfd);
close (listenfd);
return 1;
}
setnonblocking (sig_pipefd[1]);
addfd (epollfd, sig_pipefd[0]);
addsig (SIGCHLD, sig_handler);
addsig (SIGTERM, sig_handler);
addsig (SIGINT, sig_handler);
addsig (SIGPIPE, sig_handler);
bool stop_server = false;
bool terminate = false;
shmfd = shm_open (shm_name, O_CREAT | O_RDWR, 0666);
if (shmfd < 0) {
perror ("shm_open() error\n");
close (epollfd);
close (listenfd);
return 1;
}
share_mem = (char*) mmap (NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ |
PROT_WRITE, MAP_SHARED, shmfd, 0);
if (share_mem == MAP_FAILED) {
perror ("share_mem() error\n");
close (epollfd);
close (listenfd);
close (shmfd);
return 1;
}
while (!stop_server) {
int number = epoll_wait (epollfd, events, MAX_EVENT_NUMBER, -1);
if (number < 0 && errno != EINTR) {
perror ("epoll_wait() error\n");
break;
}
for (int i = 0; i < number; ++i) {
auto sockfd = events[i].data.fd;
auto event = events[i].events;
if (sockfd == listenfd) {
struct sockaddr_in client_address;
bzero (&address, sizeof (address) );
socklen_t client_addrlen = sizeof (client_address);
int connfd = accept (listenfd, (struct sockaddr*) &client_address,
&client_addrlen);
if (connfd < 0) {
printf ("errno is: %d\n", errno);
continue;
}
if (user_count >= USER_LIMIT) {
const char* info = "Too many users...";
printf ("%s\n", info);
send (connfd, info, strlen (info), 0);
close (connfd);
continue;
}
// 保存第user_count个客户连接的相关数据
users[user_count].address = client_address;
users[user_count].connfd = connfd;
// 父子进程建立管道,传递必要的数据
ret = socketpair (PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd);
if (ret < 0) {
perror ("socketpair() error\n");
close (connfd);
continue;
}
pid_t pid = fork();
if (pid < 0) {
close (connfd);
continue;
} else if (pid == 0) {
close (epollfd);
close (listenfd);
close (users[user_count].pipefd[0]);
close (sig_pipefd[0]);
close (sig_pipefd[1]);
run_child (user_count, users, share_mem);
munmap ( (void*) share_mem, USER_LIMIT * BUFFER_SIZE);
exit (0);
} else {
close (connfd);
close (users[user_count].pipefd[1]);
addfd (epollfd, users[user_count].pipefd[0]);
users[user_count].pid = pid;
// 记录新的客户连接在数组users中的索引值,
// 建立进程pid和该索引值之间的映射关系
sub_process[pid] = user_count;
++user_count;
}
} else if (sockfd == sig_pipefd[0] && event & EPOLLIN) {
// 处理信号事件
char signals[1024];
memset (signals, 0, sizeof (signals) );
ret = recv (sig_pipefd[0], signals, sizeof (signals), 0);
if (ret <= 0) {
continue;
} else {
for (int i = 0; i < ret; ++i) {
switch (signals[i]) {
case SIGCHLD: // 子进程退出,有客户端断开连接
pid_t pid;
int stat;
while ( (pid = waitpid (-1, &stat, WNOHANG) ) > 0) {
int del_user = sub_process[pid];
sub_process[pid] = -1;
if (del_user < 0 || del_user > USER_LIMIT) {
continue;
}
// 清除请求的相关数据
epoll_ctl (epollfd, EPOLL_CTL_DEL, \
users[del_user].pipefd[0], 0);
close (users[del_user].pipefd[0]);
users[del_user] = users[--user_count];
sub_process (users[del_user].pid) = del_user;
}
if (terminate && user_count == 0) {
stop_server = true;
}
break;
}
case SIGTERM:
case SIGINT: {
// 结束服务器程序
puts ("kill all child now");
if (user_count <= 0) {
stop_server = true;
break;
}
for (int i = 0; i < user_count; ++i) {
int pid = users[i].pid;
kill (pid, SIGTERM);
}
terminate = true;
break;
}
default:
break;
}
}
} else if (event & EPOLLIN) { // 写入数据
int child = 0;
ret = recv (sockfd, (char*) &child, sizeof (child), 0);
if (ret <= 0 ) {
continue;
} else {
for (int j = 0; j < user_count; ++j) {
if (users[j].pipefd[0] != sockfd) {
printf ("send data to child accross pipe\n");
send (users[j].pipefd[0], (char*) &child, sizeof (child), 0);
}
}
}
}
}
}
del_resource();
return 0;
}