10、main.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/resource.h> /*setrlimit */
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <signal.h>
#include <pthread.h>
#include "mytimer.h"
#include "client.h"
#define IPADDRESS "127.0.0.1"
#define PORT 1883
#define LISTENQ 512
#define FDSIZE 80000
#define EPOLLEVENTS 100
#define CONFIG_MIN_RESERVED_FDS 32 //come from redis src
#define CONFIG_FDSET_INCR (CONFIG_MIN_RESERVED_FDS+96)
#define CLIENT_TIMEOUT 60 * 1000//ms
int stop_server = 0;
int timer_id = 0;
//函數聲明
//建立套接字并進行綁定
static int socket_bind(const char* ip,int port);
//IO多路複用epoll
static void do_epoll(int listenfd);
//事件處理函數
static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd);
//處理接收到的連接配接
static void handle_accpet(int epollfd,int listenfd);
//讀處理
static void do_read(int epollfd,int fd);
//寫處理
static void do_write(int epollfd,int fd);
//添加事件
static void add_event(int epollfd,int fd,int state);
//修改事件
static void modify_event(int epollfd,int fd,int state);
//删除事件
static void delete_event(int epollfd,int fd,int state);
//other
static int do_error(int fd, int *error);
static int setnonblocking(int fd);
static void daemonize(void);
static int set_fdlimit();
static void signal_exit_handler();
static void signal_exit_func(int signo);
static void handle_timer();
static int timerfun_callback(int arg);
static int user_read(client_t *client, buffer_t *rbuffer);
static int user_write(client_t *client, unsigned char* data, int len);
int main(int argc,char *argv[])
{
//設定每個程序允許打開的最大檔案數,socket
if (set_fdlimit() < 0)
{
return -1;
}
int background = 0;
if (background)
{
daemonize();
}
//設定信号處理,SIG_IGN表示忽略信号,SIG_DFL表示使用信号的預設處理方式
//signal(SIGHUP, SIG_IGN); //開啟的話,就捕獲不到終端視窗關閉的信号了。即視窗關閉,程序仍然進行。
signal(SIGPIPE, SIG_IGN);
/*
if (argc != 2) {
fprintf(stderr, "Usage: %s port\n", argv[0]);
return 1;
}
int port = atoi(argv[1]);*/
create_fileEvent(FDSIZE + CONFIG_FDSET_INCR);
int listenfd;
listenfd = socket_bind(IPADDRESS,PORT);
listen(listenfd,LISTENQ);
printf("start listening...\n");
signal_exit_handler();
timer_init();
do_epoll(listenfd);
timer_destroy();
destroy_fileEvent(FDSIZE + CONFIG_FDSET_INCR);
return 0;
}
static int socket_bind(const char* ip,int port)
{
int listenfd;
struct sockaddr_in servaddr;
listenfd = socket(AF_INET,SOCK_STREAM,0);
if (listenfd == -1)
{
perror("socket error:");
exit(1);
}
bzero(&servaddr,sizeof(servaddr));
servaddr.sin_family = AF_INET;
//inet_pton(AF_INET,ip,&servaddr.sin_addr);
servaddr.sin_port = htons(port);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
int error;
int reuse = 1;
int ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
if (ret == -1)
{
return do_error(listenfd, &error);
}
if (bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1)
{
perror("bind error: ");
exit(1);
}
return listenfd;
}
static void do_epoll(int listenfd)
{
int epollfd;
struct epoll_event events[EPOLLEVENTS];
int ret;
//建立一個描述符
int error;
epollfd = epoll_create(1024);//1024 is just a hint for the kernel
if (epollfd == -1)
{
return do_error(epollfd, &error);
}
//添加監聽描述符事件
add_event(epollfd,listenfd,EPOLLIN);
struct event *event;
//struct timeval now;
struct timeval tv;
struct timeval *tvp = NULL;
while ( stop_server == 0 )
{
if ((event = timer_top()) != NULL)
{
long now_sec, now_ms;//come from redis src "ae.c", int aeProcessEvents(aeEventLoop *eventLoop, int flags)
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
/* How many milliseconds we need to wait for the next
* time event to fire? */
long long ms = (event->ev_timeout.tv_sec - now_sec) * 1000 +
(event->ev_timeout.tv_usec / 1000 - now_ms);
if (ms > 0) {
tvp->tv_sec = ms / 1000;
tvp->tv_usec = (ms % 1000) * 1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
/* maybe error
gettime(&now);
tv.tv_sec = event->ev_timeout.tv_sec - now.tv_sec;;
tv.tv_usec = event->ev_timeout.tv_usec - now.tv_usec;
if ( tv.tv_usec < 0 )
{
tv.tv_usec += 1000000;
tv.tv_sec--;
printf("tv.tv_usec < 0\n");
}
tvp = &tv;
*/
}
else
{
tvp = NULL;
printf("tvp == NULL\n");
}
//擷取已經準備好的描述符事件
if (tvp == NULL)
{
ret = epoll_wait(epollfd, events, EPOLLEVENTS, -1);
}
else
{
printf("timer_wait:%d\n", tvp->tv_sec*1000 + tvp->tv_usec/1000);//ms
ret = epoll_wait(epollfd, events, EPOLLEVENTS, tvp->tv_sec*1000 + tvp->tv_usec/1000);//ms
}
handle_events(epollfd,events,ret,listenfd);
handle_timer();
}
close(epollfd);
}
static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd)
{
int i;
int fd;
//進行選好周遊
for (i = 0;i < num;i++)
{
fd = events[i].data.fd;
//根據描述符的類型和事件類型進行處理
if ((fd == listenfd) &&(events[i].events & EPOLLIN))
handle_accpet(epollfd,listenfd);
else if (events[i].events & EPOLLIN)
do_read(epollfd,fd);
else if (events[i].events & EPOLLOUT)
do_write(epollfd,fd);
}
}
static void handle_timer()
{
timer_process();
}
static void handle_accpet(int epollfd,int listenfd)
{
int clifd;
struct sockaddr_in cliaddr;
socklen_t cliaddrlen = sizeof(cliaddr);
clifd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen);
if (clifd == -1)
perror("accpet error:");
else
{
printf("accept a new client: %s:%d\n",inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);
client_t *client = alloc_client();
if (!client) {
printf("alloc client error...close socket\n");
close(clifd);
return;
}
client->fd = clifd;
client->epollfd = epollfd;
client->timerId = timer_id;
fileev[clifd].clientData = client;
//添加一個客戶描述符和事件
add_event(epollfd,clifd,EPOLLIN);
//add timer
timer_add(timer_id, 1000, timerfun_callback, clifd, CYCLE_TIMER, 0);//ms
timer_id++;
}
}
static void do_read(int epollfd,int fd)
{
client_t *client = fileev[fd].clientData;
buffer_t *rbuffer = client->read_buffer;
check_buffer_size(rbuffer, DEFAULT_BUFF_SIZE / 2);
size_t avlid_size = rbuffer->size - rbuffer->write_idx;
//ssize_t readn = anetRead(fd, rbuffer->buff + rbuffer->write_idx, avlid_size);
//不能調用anetRead這個函數
//1.用戶端下線不好判斷
//2.該函數适合linux epoll是邊緣模式(ET),資料一定要一次性收完,anetRead裡面有while循環
//3.redis源碼自身也沒有調用anetRead
//把讀到的網絡資料寫入活塞緩存
ssize_t readn = read(fd, rbuffer->buff + rbuffer->write_idx, avlid_size);
if (readn > 0)
{
rbuffer->write_idx += readn;
user_read(client, rbuffer);
client->last_recv_tick = get_tick_count();
}
else if (readn == 0)
{
printf("fd=%d, client disconnect, close it.\n", client->fd);
delete_event(epollfd,fd,EPOLLIN);
delete_event(epollfd,fd,EPOLLOUT);
timer_remove(client->timerId);
free_client(client);
}
else if (readn == -1)
{
if (errno == EAGAIN) {
return;
} else {
printf("read error,%s.\n", strerror(errno));
delete_event(epollfd,fd,EPOLLIN);
delete_event(epollfd,fd,EPOLLOUT);
timer_remove(client->timerId);
free_client(client);
}
}
}
static void do_write(int epollfd,int fd)
{
client_t *client = fileev[fd].clientData;
buffer_t *wbuffer = client->write_buffer;
int data_size = (int)get_readable_size(wbuffer);
if (data_size == 0) {
delete_event(epollfd,fd,EPOLLOUT);
return;
}
//int writen = anetWrite(client->fd, (char *)wbuffer->buff + wbuffer->read_idx, data_size);
int writen = write(client->fd, (char *)wbuffer->buff + wbuffer->read_idx, data_size);
if (writen > 0) {
wbuffer->read_idx += writen;
} else if (writen == 0) {
printf("Writing 0\n");
} else { //-1
if (errno != EWOULDBLOCK) {
printf("Writing error: %s\n", strerror(errno));
} else {
printf("Writing EWOULDBLOCK\n");
}
}
if (get_readable_size(wbuffer) == 0) {
delete_event(epollfd,fd,EPOLLOUT);
}
}
static int user_read(client_t *client, buffer_t *rbuffer)
{
size_t len = get_readable_size(rbuffer);
unsigned char data[DEFAULT_BUFF_SIZE];
if (len > DEFAULT_BUFF_SIZE)
{
len = DEFAULT_BUFF_SIZE;
}
//把活塞緩存讀取出來,作為使用者資料
memcpy(data, rbuffer->buff + rbuffer->read_idx, len);
rbuffer->read_idx += len;
int i = 0;
for (i = 0; i < len; i++)
{
printf("%c", data[i]);
}
printf("\n");
user_write(client, data, len);//echo
return 0;
}
static int user_write(client_t *client, unsigned char* data, int len)
{
//把使用者資料寫入活塞緩存
buffer_t *wbuffer = client->write_buffer;
check_buffer_size(wbuffer, len);
memcpy((char *)wbuffer->buff + wbuffer->write_idx, data, len);
wbuffer->write_idx += len;
//把活塞緩存的有效資料通過網絡發送出去
//int writen = anetWrite(client->fd, (char *)wbuffer->buff + wbuffer->read_idx, (int)get_readable_size(wbuffer));
int writen = write(client->fd, (char *)wbuffer->buff + wbuffer->read_idx, (int)get_readable_size(wbuffer));
if (writen > 0) {
wbuffer->read_idx += writen;
} else if (writen == 0) {
printf("Writing 0\n");
} else { //-1
if (errno != EWOULDBLOCK) {
printf("Writing error: %s\n", strerror(errno));
} else {
printf("Writing EWOULDBLOCK\n");
}
}
//如果writen==-1,表示目前tcp視窗容量不夠,需要等待下次機會再發,errno == EWOULDBLOCK
//因為活塞緩存的有效資料沒有發完,遺留部分需要再給機會
if (get_readable_size(wbuffer) != 0) {
/*
if (aeCreateFileEvent(client->loop, client->fd,
AE_WRITABLE, writeEventHandler, client) == AE_ERR) {
printf("create socket writeable event error, close it.\n");
free_client(client);
}*/
//modify_event(client->epollfd,client->fd,EPOLLOUT);
add_event(client->epollfd,client->fd,EPOLLOUT);
}
return 0;
}
static void add_event(int epollfd,int fd,int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
setnonblocking(fd);
}
static void delete_event(int epollfd,int fd,int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
}
static void modify_event(int epollfd,int fd,int state)
{
struct epoll_event ev;
ev.events = state;
ev.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}
static int do_error(int fd, int *error)
{
fprintf(stderr, "error: %s\n", strerror(errno));
*error = errno;
while ((close(fd) == -1) && (errno == EINTR));
errno = *error;
return 1;
}
static int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
static void daemonize(void) { //come from /redis/server.c/daemonize()
int fd;
if (fork() != 0) exit(0); /* parent exits */
setsid(); /* create a new session */
/* Every output goes to /dev/null. If Redis is daemonized but
* the 'logfile' is set to 'stdout' in the configuration file
* it will not log at all. */
if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
dup2(fd, STDIN_FILENO);
dup2(fd, STDOUT_FILENO);
dup2(fd, STDERR_FILENO);
if (fd > STDERR_FILENO) close(fd);
}
}
static int set_fdlimit()
{
//設定每個程序允許打開的最大檔案數
//這項功能等價于linux終端指令 "ulimit -n 102400"
struct rlimit rt;
rt.rlim_max = rt.rlim_cur = FDSIZE + CONFIG_FDSET_INCR;
if (setrlimit(RLIMIT_NOFILE, &rt) == -1)
{
perror("setrlimit error");
return -1;
}
return 0;
}
static void signal_exit_handler()
{
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_handler = signal_exit_func;
sigaction(SIGINT, &sa, NULL);//當按下ctrl+c時,它的效果就是發送SIGINT信号
sigaction(SIGTERM, &sa, NULL);//kill pid
sigaction(SIGQUIT, &sa, NULL);//ctrl+\代表退出SIGQUIT
//SIGSTOP和SIGKILL信号是不可捕獲的,是以下面兩句話寫了等于沒有寫
sigaction(SIGKILL, &sa, NULL);//kill -9 pid
sigaction(SIGSTOP, &sa, NULL);//ctrl+z代表停止
//#define SIGTERM 15
//#define SIGKILL 9
//kill和kill -9,兩個指令在linux中都有殺死程序的效果,然而兩指令的執行過程卻大有不同,在程式中如果用錯了,可能會造成莫名其妙的現象。
//執行kill pid指令,系統會發送一個SIGTERM信号給對應的程式。
//執行kill -9 pid指令,系統給對應程式發送的信号是SIGKILL,即exit。exit信号不會被系統阻塞,是以kill -9能順利殺掉程序。
}
static void signal_exit_func(int signo)
{
printf("exit signo is %d\n", signo);
stop_server = 1;
}
static int timerfun_callback(int arg)
{
client_t *client = fileev[arg].clientData;
assert(arg == client->fd);
printf("timer_id=%d, fd=%d\n", client->timerId, client->fd);
//心跳機制:定時檢測,如果沒有資料來則踢除用戶端
uint64_t curr_tick = get_tick_count();
if (curr_tick > client->last_recv_tick + CLIENT_TIMEOUT)
{
printf("timeout, fd=%d\n", client->fd);
//timer_remove(arg);//不能在這裡删除timer,因為是回調函數,timer其實已經先pop掉了,我們要做的是不讓它再次加入堆
delete_event(client->epollfd, client->fd, EPOLLIN);
delete_event(client->epollfd, client->fd, EPOLLOUT);
free_client(client);
return 0;//kill timer,傳回值為0,目的是不讓它再次加入堆
}
return 1;
}