目錄
基礎
IO複用機制
select機制:
select服務端
poll機制
poll例子
epoll 基礎
基礎
setvbuff函數:設定id緩沖區
例:setvbuff(fp_src,buffer,_IOLBF,128);
_IOFBF 全緩沖區: BUFSIZE ffliush 預設開啟
_IOLBF 行緩沖區:遇到換行符重新整理
_IONBF 無緩沖區:read write strerr
指針緩沖區: setbuf(FILE * stream,char*buf) buf必須指向一個長度為BUFSIZE 大小的緩沖區
socket的io模型
1.非阻塞io處理 2.阻塞io處理 3.io複用的io方式 4.異步io 5.信号驅動式io
IO複用機制
select機制:
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout); //檔案最多不能超過2000
第一個參數:所有檔案夾檔案數加一 2:等待讀檔案夾 3:等待寫檔案夾 4:等待錯誤檔案夾 5 :select逾時時間
第五個參數:
struct timeval {
long tv_sec;
long tv_usec;
};
傳回值-1:錯誤 0:逾時 >=1:可操作性檔案個數
使用方法:
1.建立fd_set: struct fd_set rfds;
2.設定fd_set: FD_ZERO(&rfds) 加入句柄FD_SET(fd,&rfds) 移除FD_CLR(int fd ,fd_set * set)
3.使用:select() accept()等待 查詢 int FD_ISSET(int fd ,fd_set * set)
注意:1:每次重新發起select 需要重新添加fd_set
2.将放入到select中的句柄都設定為非阻塞狀态
select服務端
#include <stdlib.h>
#include <sys/types.h>
#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/wait.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/time.h>
#define MAXBUF 1024
int main(int argc,char *argv[])
{
int sockfd,new_fd;
socklen_t len;
struct sockaddr_in my_addr,their_addr;
unsigned int myport,lisnum;
char buf[MAXBUF+1];
fd_set rfds;
struct timeval tv;
int retval,maxfd= -1;
if (argv[2])
{
myport = atoi(argv[2]);
}else
myport = 6666;
if (argv[3])
{
lisnum = atoi(argv[3]);
}else
lisnum = 2;
if ((sockfd = socket(AF_INET,SOCK_STREAM,0)) == -1)
{
perror("socket fail!\n");
exit(EXIT_FAILURE);
}
bzero(&my_addr,sizeof(my_addr));
my_addr.sin_family = PF_INET;
my_addr.sin_port = htons(myport);
if (argv[1])
{
my_addr.sin_addr.s_addr = inet_addr(argv[1]);
}else
my_addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(sockfd,(struct sockaddr *)&my_addr,sizeof(struct sockaddr))==-1)
{
perror("socket bind fail!\n");
exit(EXIT_FAILURE);
}
if (listen(sockfd,lisnum)==-1)
{
perror("socket fail!\n");
exit(EXIT_FAILURE);
}
while(1)
{
printf("\n----------wait for new socket connect-----------\n");
len = sizeof(struct sockaddr);
if((new_fd = accept(sockfd,(struct sockaddr*)&their_addr,&len))==-1)
{
perror("socket fail!\n");
exit(EXIT_FAILURE);
}else
printf("server: got connect from%s,port%d,socket%d\n",inet_ntoa(their_addr.sin_addr),ntohs(their_addr.sin_port),new_fd);
while(1){
FD_ZERO(&rfds);
FD_SET(0,&rfds);//将标準輸入放入檔案夾
FD_SET(new_fd,&rfds);
maxfd = new_fd;
tv.tv_sec = 1;
tv.tv_usec = 0;
retval = select(maxfd+1,&rfds,NULL,NULL,&tv);
if(retval == -1)
{
perror("select fail!\n");
exit(EXIT_FAILURE);
}else if(retval == 0){
continue;
}
else{
if (FD_ISSET(0,&rfds))
{
bzero(buf,MAXBUF +1);
fgets(buf,MAXBUF,stdin);
if (!strncasecmp(buf,"quit",4))
{
printf("i will quit!\n");
break;
}
len = send(new_fd,buf,strlen(buf)-1,0);
if(len>0)
printf("send successful,%d byte send!\n",len);
else
{
printf("send failure\n");
}
}
if (FD_ISSET(new_fd,&rfds))
{
bzero(buf,MAXBUF+1);
len = recv(new_fd,buf,MAXBUF,0);
if (len>0)
{
printf("recv success! value:%s len:%d\n",buf,len);
}else{
if (len<0)
{
printf("recv failure\n");
}
else{
printf("the other one end,quit\n");
break;
}
}
}
}
}
close(new_fd);
printf("need othe connect (no-quit)\n");
fflush(stdout);
bzero(buf ,MAXBUF+1);
fgets(buf,MAXBUF,stdin);
if (!strncasecmp(buf,"no",2))
{
printf("quit!\n");
break;
}
}
close(new_fd);
close(sockfd);
fflush(stdout);
fflush(stdin);
printf("system quit!\n");
return 0;
}
poll機制
struct pollfd
{
int fd 目前句柄
short events 句柄綁定的事件
short revents 傳回事件的類型
}
#define POLLIN 0x0001 輸入事件
#define POLLPRI 0x0002 優先級
#define POLLOUT 0x0003
#define POLLERR 0x0004
#define POLLIHUP 0x0005
#define POLLINUAL 0x0006
pollfd
.fd = listen_fd
.events = POLLIN|POLLPRI
int poll (struct pollfd *fds,nfds_t nfds,int timeout)
第一個參數:poll連結清單 2:多少個等待 3:逾時時間 毫秒
poll例子
#include "stdio.h"
#include "poll.h"
#include "string.h"
int main(int argc,char*argv[])
{
int timeout = 3000;
char buf[1024];
struct pollfd fd_poll[1];
while(1){
fd_poll[0].fd = 0;
fd_poll[0].events = POLLIN;
fd_poll[0].revents = 0;
memset(buf,'\0',sizeof(buf));
switch(poll(fd_poll,1,timeout)){
case 0:
printf("逾時!\n");
break;
case -1:
printf("錯誤\n");
break;
default:
if (fd_poll[0].revents&POLLIN)
{
gets(buf);
printf("buf:%s\n",buf);
if (!strncasecmp(buf,"quit",4))
{
printf("Game Over!!!\n");
return 0;
}
}
break;
}
}
return 0;
}
epoll 基礎
并發性高,可同時監聽c10k以上級别的客戶事件
實作原理:fd----event
大規模并發伺服器架構:epoll + threadpoll+mysql
對象epoll_fd epoll_event
建立epollfd對象
int epoll_creat(int size);傳回值 epoll_fd epoll_event size 最大監聽多少
設定epolldf對象
建立 epoll_event對象
struct epoll_event ep_ev;
設定 epoll_event對象
ep_ev.events = EPOLLIN;//資料的讀取
ep_ev.data.fd = listen_sock;
使用 epoll_event對象
epoll_ctl(int epfd, int op, int fd , struct epoll_event event);
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_sock, &ep_ev);
EPOLL_CTL_ADD EPOLL_CTLMOD EPOLL_CTL_DEL
使用epollfd對象;
int epoll_wait(int eplf, struct epoll_event * events, int maxevents, int timeout);
//第二個參數epoll_event * events 是用來做傳回的
type union epoll_data{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
}epoll_data_t;
struct epoll_event{
uint32_t events;
epoll_data_t data;
}
#include "stdio.h"
#include "unistd.h"
#include "sys/types.h"
#include "sys/socket.h"
#include "netinet/in.h"
#include "arpa/inet.h"
#include "sys/epoll.h"
#include "fcntl.h"
#include "stdlib.h"
#include <string.h>
int creat_socket(char *ip,char *port)
{
int sock = socket(AF_INET,SOCK_STREAM,0);
if(sock<0){
perror("socket");
exit(2);
}
int opt = 1;
//設定socket 先斷開時 避免進入time_wait狀态,屬性SO_REUSEADDR,是使其位址能夠重用
if(setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt))<0)
{
perror("setsockopt");
exit(3);
}
struct sockaddr_in local;
local.sin_family = AF_INET;
local.sin_port = htons(atoi(port));
local.sin_addr.s_addr = inet_addr(ip);
if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0)
{
perror("bind");
exit(4);
}
if(listen(sock,5)<0){
perror("listen");
exit(5);
}
printf("listen and bind succeed\n");
return sock;
}
int set_noblock(int sock)
{
int fl = fcntl(sock,F_GETFL);
return fcntl(sock,F_SETFL,fl|O_NONBLOCK);
}
int main(int argc,char *argv[])
{
if(argc != 3)
{
printf("please use:%s [ip] [port]\n",argv[0]);
exit(1);
}
int listen_sock = creat_socket(argv[1],argv[2]);
int epoll_fd = epoll_create(256);
if(epoll_fd <0)
{
perror("epoll_create");
exit(6);
}
struct epoll_event ep_ev;
ep_ev.events = EPOLLIN;
ep_ev.data.fd = listen_sock;
if (epoll_ctl(epoll_fd,EPOLL_CTL_ADD,listen_sock,&ep_ev)<0)
{
perror("epoll_ctl");
exit(7);
}
struct epoll_event ready_ev[128];
int maxnum = 128;
int timeout = 1000;//設定逾時時間 若為-1則永久等待
int ret = 0;
int done = 0;
while(!done)
{
switch(ret = epoll_wait(epoll_fd,ready_ev,maxnum,timeout))
{
case -1:
perror("epoll_wait");
exit(8);
break;
case 0:
// perror("time out ...\n");
break;
default:
// printf("a new sigle!\n");
for (int i = 0; i < ret; ++i)
{
//判斷是否是監聽套接字,是的話accept
int fd = ready_ev[i].data.fd;
if ((fd==listen_sock)&&(ready_ev[i].events&EPOLLIN))
{
struct sockaddr_in remote;
socklen_t len = sizeof(remote);
int accept_sock = accept(listen_sock,(struct sockaddr*)&remote,&len);
if (accept<0)
{
perror("accept");
continue;
}
printf("accept a clinent..[ip]:%s,[port]:%d\n",inet_ntoa(remote.sin_addr),ntohs(remote.sin_port));
ep_ev.events = EPOLLIN|EPOLLET;
ep_ev.data.fd = accept_sock;
set_noblock(accept_sock);
if(epoll_ctl(epoll_fd,EPOLL_CTL_ADD,accept_sock,&ep_ev)<0)
{
perror("epoll_ctl");
close(accept_sock);
}
}else{
if(ready_ev[i].events&EPOLLIN){
//申請空間同時存檔案描述符合緩沖區位址
char buf[102400];
memset(buf,'\0',sizeof(buf));
ssize_t _s = recv(fd,buf,sizeof(buf)-1,0);
if (_s<0)
{
perror("recv");
continue;
}else if(_s==0){
printf("remote close..\n");
epoll_ctl(epoll_fd,EPOLL_CTL_DEL,fd,NULL);
close(fd);
}else{
printf("client#%s\n",buf);
fflush(stdout);
ep_ev.data.fd = fd;
ep_ev.events = EPOLLOUT|EPOLLET;
epoll_ctl(epoll_fd,EPOLL_CTL_MOD,fd,&ep_ev);
}
}else if(ready_ev[i].events&EPOLLOUT)
{
const char * msg = ".................";
send(fd,msg,strlen(msg),0);
epoll_ctl(epoll_fd,EPOLL_CTL_DEL,fd,NULL);
close(fd);
}
}
}
break;
}
}
}