天天看點

linux網絡程式設計--eventfd

eventfd 在核心版本,2.6.22以後有效。檢視核心版本可以用指令 uname -r。

eventfd類似于管道的概念,可以實作線程間的事件通知,類似于pipe。而eventfd 是一個比 pipe 更高效的線程間事件通知機制,一方面它比 pipe 少用一個 file descriper,節省了資源;另一方面,eventfd 的緩沖區管理也簡單得多,全部“buffer”一共隻有8位元組,不像pipe那樣可能有不定長的真正buffer。

eventfd的緩沖區大小是sizeof(uint64_t)也就是8位元組,它是一個64位的計數器,寫入遞增計數器,讀取将得到計數器的值,并且清零。

涉及API:

#include <sys/eventfd.h>
int eventfd(unsigned int initval, int flags);
           

這個函數會建立一個事件對象 (eventfd object), 用來實作程序(線程)間的等待/通知(wait/notify) 機制. 核心會為這個對象維護一個64位的計數器(uint64_t)。

并且使用第一個參數(initval)初始化這個計數器。調用這個函數就會傳回一個新的檔案描述符(event object)。2.6.27版本開始可以按位設定第二個參數(flags)。

flags 有如下的一些宏可以使用:

EFD_NONBLOCK:功能同open(2) 的O_NONBLOCK,設定對象為非阻塞狀态,如果沒有設定這個狀态的話,read(2)讀eventfd,并且計數器的值為0 就一直堵塞在read調用當中,要是設定了這個标志, 就會傳回一個 EAGAIN 錯誤(errno = EAGAIN)。效果也如同 額外調用select(2)達到的效果。

EFD_CLOEXEC:顧名思義是在執行 exec() 調用時關閉檔案描述符,防止檔案描述符洩漏給子程序。

如果是2.6.26或之前版本的核心,flags 必須設定為0。

建立這個對象後,可以對其做如下操作。

write 将緩沖區寫入的8位元組整形值加到核心計數器上。

read 讀取8位元組值, 并把計數器重設為0. 如果調用read的時候計數器為0, 要是eventfd是阻塞的, read就一直阻塞在這裡,否則就得到 一個EAGAIN錯誤。

如果buffer的長度小于8那麼read會失敗, 錯誤代碼被設定成 EINVAL。

close 當不需要eventfd的時候可以調用close關閉, 當這個對象的所有句柄都被關閉的時候,核心會釋放資源。 為什麼不是close就直接釋放呢, 如果調用fork 建立

程序的時候會複制這個句柄到新的程序,并繼承所有的狀态。

(ps:也就是說,在write之後沒有read,但是又write新的資料,那麼讀取的是這兩次的8個位元組的和,在read之後再write,可以完成read和write之間的互動)

下面看一個簡單的eventfd的示例:

#include <sys/eventfd.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdint.h>             /* Definition of uint64_t */

#define handle_error(msg) \
   do { perror(msg); exit(EXIT_FAILURE); } while (0)

int
main(int argc, char *argv[])
{
   int efd = eventfd(0, 0);
   int j;
   if (efd == -1)
       handle_error("eventfd");

   int ret = fork();
   if(ret == 0) //child
   {
       for (j = 1; j < 10 ; j++) {
           printf("Child writing %d to efd\n", j);
           uint64_t one = j;
           ssize_t s = write(efd, &one, sizeof one);
           if (s != sizeof one)
               handle_error("write");
       }
       printf("Child completed write loop\n");

       exit(EXIT_SUCCESS);
   }
   else  //parent
   {
       sleep(2);
       uint64_t one;
       ssize_t s = read(efd, &one, sizeof one);
       if (s != sizeof one)
           handle_error("read");
       printf("Parent read %llu from efd\n",(unsigned long long)one);
       exit(EXIT_SUCCESS);
   }
}
           

輸出:

linux網絡程式設計--eventfd

解釋:

這個例子很簡單,建立一個eventfd用于父子程序之間通信,子程序寫入資料,然後父程序讀取。例子并沒有什麼實際意義。

我們再看一個稍微複雜一點的例子,使用epoll和eventfd:

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/time.h>
#include <stdint.h>
#include <pthread.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>

#define EPOLL_MAX_NUM 10

#define handle_error(msg) \
   do { perror(msg); exit(EXIT_FAILURE); } while (0)

int efd = -1;

void *read_thread(void *arg)
{
	int ret = 0;
	uint64_t count = 0;
	int ep_fd = -1;
	struct epoll_event events[EPOLL_MAX_NUM];

	if (efd < 0)
	{
		printf("efd not inited.\n");
		return;
	}

	ep_fd = epoll_create(1024);
	if (ep_fd < 0)
	{
		handle_error("epoll_create fail: ");
	}

	{
		struct epoll_event read_event;

		read_event.events = EPOLLIN;
		read_event.data.fd = efd;  //add eventfd to epoll

		ret = epoll_ctl(ep_fd, EPOLL_CTL_ADD, efd, &read_event);
		if (ret < 0)
		{
			handle_error("epoll ctl failed:");
		}
	}

	while (1)
	{
		ret = epoll_wait(ep_fd, &events[0], EPOLL_MAX_NUM, -1);
		if (ret > 0)
		{
			int i = 0;
			for (; i < ret; i++)
			{	/*
				if (events[i].events & EPOLLHUP)
				{
					printf("epoll eventfd has epoll hup.\n");
				}
				else if (events[i].events & EPOLLERR)
				{
					printf("epoll eventfd has epoll error.\n");
				}
				else */
				 if (events[i].events & EPOLLIN)
				{
					int event_fd = events[i].data.fd;
					ret = read(event_fd, &count, sizeof(count));
					if (ret < 0)
					{
						handle_error("read fail:");
					}
					else
					{
						struct timeval tv;

						gettimeofday(&tv, NULL);
						printf("success read from efd, read %d bytes(%llu) at %lds %ldus\n",
							   ret, count, tv.tv_sec, tv.tv_usec);
					}
				}
			}
		}
		else if (ret == 0)
		{
			/* time out */
			printf("epoll wait timed out.\n");
			break;
		}
		else
		{
			handle_error("epoll wait error:");
		}
	}

}

int main(int argc, char *argv[])
{
	pthread_t pid = 0;
	uint64_t count = 0;
	int ret,i;

	efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
	if (efd < 0)
	{
		handle_error("eventfd failed.");
	}

	ret = pthread_create(&pid, NULL, read_thread, NULL);
	if (ret < 0)
	{
		handle_error("pthread create:");
	}

	for (i = 0; i < 5; i++)
	{
		count = 4;
		ret = write(efd, &count, sizeof(count));
		if (ret < 0)
		{
			handle_error("write event fd fail:");
		}

		sleep(1);
	}
	
	printf("write_end\n");	
	
	pthread_join(pid, NULL);

	close(efd);
	return 0;
}
           

繼續閱讀