天天看點

Linux開發——檔案管理,資料通信,io複用

目錄

基礎

IO複用機制

         select機制:

select服務端

poll機制

poll例子

epoll 基礎

基礎

setvbuff函數:設定id緩沖區

                      例:setvbuff(fp_src,buffer,_IOLBF,128);

_IOFBF   全緩沖區: BUFSIZE ffliush 預設開啟

_IOLBF   行緩沖區:遇到換行符重新整理

_IONBF   無緩沖區:read write strerr

指針緩沖區: setbuf(FILE * stream,char*buf)  buf必須指向一個長度為BUFSIZE 大小的緩沖區

socket的io模型

            1.非阻塞io處理 2.阻塞io處理  3.io複用的io方式  4.異步io  5.信号驅動式io

IO複用機制

            select機制:

                int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout); //檔案最多不能超過2000

                第一個參數:所有檔案夾檔案數加一     2:等待讀檔案夾   3:等待寫檔案夾  4:等待錯誤檔案夾 5 :select逾時時間

                第五個參數:         

                                    struct timeval {

                                    long    tv_sec;        

                                    long    tv_usec;        

                                    };

               傳回值-1:錯誤      0:逾時       >=1:可操作性檔案個數

           使用方法:

              1.建立fd_set:     struct fd_set rfds;

              2.設定fd_set:  FD_ZERO(&rfds)       加入句柄FD_SET(fd,&rfds)        移除FD_CLR(int fd ,fd_set * set)

              3.使用:select()   accept()等待      查詢 int FD_ISSET(int fd ,fd_set * set)

              注意:1:每次重新發起select  需要重新添加fd_set

                         2.将放入到select中的句柄都設定為非阻塞狀态 

select服務端

#include <stdlib.h>
#include <sys/types.h>
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/wait.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/time.h>

#define MAXBUF 1024

int main(int argc,char *argv[])
{
	int sockfd,new_fd;
	socklen_t len;
	struct sockaddr_in my_addr,their_addr;
	unsigned int myport,lisnum;
	char buf[MAXBUF+1];
	fd_set rfds;
	struct timeval tv;
	int retval,maxfd= -1;

	if (argv[2])
	{
		myport = atoi(argv[2]);
	}else 
		myport = 6666;

	if (argv[3])
	{
		lisnum = atoi(argv[3]);
	}else
		lisnum = 2;
	if ((sockfd = socket(AF_INET,SOCK_STREAM,0)) == -1)
	{
		perror("socket fail!\n");
		exit(EXIT_FAILURE);
	}

	bzero(&my_addr,sizeof(my_addr));
	my_addr.sin_family = PF_INET;
	my_addr.sin_port = htons(myport);
	if (argv[1])
	{
		my_addr.sin_addr.s_addr = inet_addr(argv[1]);	
	}else 
		my_addr.sin_addr.s_addr = htonl(INADDR_ANY);

	if (bind(sockfd,(struct sockaddr *)&my_addr,sizeof(struct sockaddr))==-1)
	{
		perror("socket bind fail!\n");
		exit(EXIT_FAILURE);
	}
	if (listen(sockfd,lisnum)==-1)
	{
		perror("socket fail!\n");
		exit(EXIT_FAILURE);
	}
	while(1)
	{
		printf("\n----------wait for new socket connect-----------\n");
		len = sizeof(struct sockaddr);
		if((new_fd = accept(sockfd,(struct sockaddr*)&their_addr,&len))==-1)
		{
			perror("socket fail!\n");
		    exit(EXIT_FAILURE);
		}else
		printf("server: got connect from%s,port%d,socket%d\n",inet_ntoa(their_addr.sin_addr),ntohs(their_addr.sin_port),new_fd);

		while(1){
			FD_ZERO(&rfds);
			FD_SET(0,&rfds);//将标準輸入放入檔案夾
			FD_SET(new_fd,&rfds);
			maxfd = new_fd;
			tv.tv_sec = 1;
			tv.tv_usec = 0;
			retval = select(maxfd+1,&rfds,NULL,NULL,&tv);
			if(retval == -1)
			{
				perror("select fail!\n");
				exit(EXIT_FAILURE);
			}else if(retval == 0){
				continue;
			}
			else{
				if (FD_ISSET(0,&rfds))
				{
					 bzero(buf,MAXBUF +1);
					 fgets(buf,MAXBUF,stdin);
					 if (!strncasecmp(buf,"quit",4))
					 {
					 	printf("i will quit!\n");
					 	break;
					 }
					 len = send(new_fd,buf,strlen(buf)-1,0);
					 if(len>0)
					 	printf("send successful,%d byte send!\n",len);
					 else
					 {
					 	printf("send failure\n");
					 }
				}
				if (FD_ISSET(new_fd,&rfds))
				{
					bzero(buf,MAXBUF+1);
					len = recv(new_fd,buf,MAXBUF,0);
					if (len>0)
					{
						printf("recv success! value:%s len:%d\n",buf,len);
					}else{
						if (len<0)
						{
							printf("recv failure\n");
						}
						else{
						printf("the other one end,quit\n");
						break;
						}
					}
				}
			}	
		}
		close(new_fd);
		printf("need othe connect (no-quit)\n");
		fflush(stdout);
		bzero(buf ,MAXBUF+1);
		fgets(buf,MAXBUF,stdin);
		if (!strncasecmp(buf,"no",2))
		{
			printf("quit!\n");
			break;
		}
	}
	close(new_fd);
	close(sockfd);
	fflush(stdout);
	fflush(stdin);
	printf("system quit!\n");
	return 0;
}
           

poll機制

struct pollfd

                  { 

                   int fd      目前句柄

                   short events  句柄綁定的事件

                   short revents  傳回事件的類型

                  }

#define POLLIN 0x0001          輸入事件

#define POLLPRI 0x0002        優先級

#define POLLOUT 0x0003

#define POLLERR 0x0004

#define POLLIHUP 0x0005

#define POLLINUAL 0x0006

pollfd 

          .fd = listen_fd

           .events = POLLIN|POLLPRI

int poll (struct pollfd *fds,nfds_t nfds,int timeout)

            第一個參數:poll連結清單         2:多少個等待       3:逾時時間 毫秒

poll例子

#include "stdio.h"
#include "poll.h"
#include "string.h"

int main(int argc,char*argv[])
{
        int timeout = 3000;
        char buf[1024];
        struct pollfd fd_poll[1];

        while(1){
                fd_poll[0].fd = 0;
                fd_poll[0].events = POLLIN;
                fd_poll[0].revents = 0;
                memset(buf,'\0',sizeof(buf));
                switch(poll(fd_poll,1,timeout)){
                        case 0:
                                printf("逾時!\n");
                        break;
                        case -1:
                                printf("錯誤\n");
                        break;
                        default:
                                if (fd_poll[0].revents&POLLIN)
                                {
                                        gets(buf);
                                        printf("buf:%s\n",buf);
                                        if (!strncasecmp(buf,"quit",4))
                                        {
                                                printf("Game Over!!!\n");
                                                return 0;
                                        }
                                }
                        break;
                }
        }
        return 0;
}
  
           

 epoll 基礎

并發性高,可同時監聽c10k以上級别的客戶事件

 實作原理:fd----event

大規模并發伺服器架構:epoll + threadpoll+mysql

對象epoll_fd epoll_event

建立epollfd對象

       int epoll_creat(int size);傳回值 epoll_fd epoll_event      size 最大監聽多少

設定epolldf對象

      建立 epoll_event對象

              struct epoll_event ep_ev;

       設定 epoll_event對象

              ep_ev.events = EPOLLIN;//資料的讀取

               ep_ev.data.fd = listen_sock;

        使用 epoll_event對象

                epoll_ctl(int epfd,  int op,   int fd ,  struct epoll_event event);

                epoll_ctl(epoll_fd,   EPOLL_CTL_ADD,   listen_sock,  &ep_ev);

                                    EPOLL_CTL_ADD           EPOLL_CTLMOD            EPOLL_CTL_DEL

使用epollfd對象;

          int epoll_wait(int eplf,  struct epoll_event * events, int maxevents, int timeout);

//第二個參數epoll_event * events 是用來做傳回的

type union epoll_data{

         void         *ptr;

         int             fd;

         uint32_t    u32;

          uint64_t    u64;

}epoll_data_t;

struct epoll_event{

         uint32_t            events;

         epoll_data_t      data;

}

#include "stdio.h"
#include "unistd.h"
#include "sys/types.h"
#include "sys/socket.h"
#include "netinet/in.h"
#include "arpa/inet.h"
#include "sys/epoll.h"
#include "fcntl.h"
#include "stdlib.h"
#include <string.h>


int creat_socket(char *ip,char *port)
{
        int sock = socket(AF_INET,SOCK_STREAM,0);
        if(sock<0){
                perror("socket");
                exit(2);
        }
        int opt = 1;
        //設定socket 先斷開時 避免進入time_wait狀态,屬性SO_REUSEADDR,是使其位址能夠重用
        if(setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt))<0)
        {
                perror("setsockopt");
                exit(3);
        }
        struct sockaddr_in local;
        local.sin_family = AF_INET;
        local.sin_port = htons(atoi(port));
        local.sin_addr.s_addr = inet_addr(ip);
        if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0)
        {
                perror("bind");
                exit(4);
        }
        if(listen(sock,5)<0){
                perror("listen");
                exit(5);
        }
        printf("listen and bind succeed\n");
        return sock;
}
int set_noblock(int sock)
{
        int fl = fcntl(sock,F_GETFL);
        return fcntl(sock,F_SETFL,fl|O_NONBLOCK);
}
int main(int argc,char *argv[])
{
        if(argc != 3)
        {
                printf("please use:%s [ip] [port]\n",argv[0]);
                exit(1);
        }
        int listen_sock = creat_socket(argv[1],argv[2]);
        int epoll_fd = epoll_create(256);
        if(epoll_fd <0)
        {
                perror("epoll_create");
                exit(6);
        }
        struct epoll_event ep_ev;
        ep_ev.events = EPOLLIN;
        ep_ev.data.fd = listen_sock;
        if (epoll_ctl(epoll_fd,EPOLL_CTL_ADD,listen_sock,&ep_ev)<0)
        {
                perror("epoll_ctl");
                exit(7);
        }

        struct epoll_event ready_ev[128];
        int maxnum = 128;
        int timeout = 1000;//設定逾時時間 若為-1則永久等待
        int ret = 0;
        int done = 0;
        while(!done)
        {
                switch(ret = epoll_wait(epoll_fd,ready_ev,maxnum,timeout))
                {
                        case -1:
                                perror("epoll_wait");
                                exit(8);
                        break;
                        case 0:
                        //      perror("time out ...\n");
                        break;
                        default:
                //      printf("a new sigle!\n");       
                                for (int i = 0; i < ret; ++i)
                                {
 //判斷是否是監聽套接字,是的話accept
                                        int fd = ready_ev[i].data.fd;
                                        if ((fd==listen_sock)&&(ready_ev[i].events&EPOLLIN))
                                        {
                                                struct sockaddr_in remote;
                                                socklen_t len = sizeof(remote);

                                                int accept_sock = accept(listen_sock,(struct sockaddr*)&remote,&len);
                                                if (accept<0)
                                                {
                                                        perror("accept");
                                                        continue;
                                                }
                                                printf("accept a clinent..[ip]:%s,[port]:%d\n",inet_ntoa(remote.sin_addr),ntohs(remote.sin_port));
                                                ep_ev.events = EPOLLIN|EPOLLET;
                                                ep_ev.data.fd = accept_sock;
                                                set_noblock(accept_sock);
                                                if(epoll_ctl(epoll_fd,EPOLL_CTL_ADD,accept_sock,&ep_ev)<0)
                                                {
                                                        perror("epoll_ctl");
                                                        close(accept_sock);
                                                }
                                        }else{
                                                if(ready_ev[i].events&EPOLLIN){
                                                        //申請空間同時存檔案描述符合緩沖區位址
                                                        char buf[102400];
                                                        memset(buf,'\0',sizeof(buf));

                                                        ssize_t _s = recv(fd,buf,sizeof(buf)-1,0);
                                                        if (_s<0)
                                                        {
                                                                perror("recv");
                                                                continue;
                                                        }else if(_s==0){
                                                                printf("remote close..\n");
                                                                epoll_ctl(epoll_fd,EPOLL_CTL_DEL,fd,NULL);
                                                                close(fd);
                                                        }else{
                                                                printf("client#%s\n",buf);
                                                                fflush(stdout);

                                                                ep_ev.data.fd = fd;
                                                                ep_ev.events = EPOLLOUT|EPOLLET;

                                                                epoll_ctl(epoll_fd,EPOLL_CTL_MOD,fd,&ep_ev);
                                                        }
                                                }else if(ready_ev[i].events&EPOLLOUT)
                                                {
                                                        const char * msg = ".................";
                                                        send(fd,msg,strlen(msg),0);
                                                        epoll_ctl(epoll_fd,EPOLL_CTL_DEL,fd,NULL);
                                                        close(fd);
                                                }
                                        }
                                }
                        break;
                }
        }
}

           

繼續閱讀