現在來陳述第二節遺留的問題,在多線程的場景下,假如主線程檢測到新連接配接的***已連接配接套接字***上有讀寫事件發生時,那麼此時如何把該套接字的IO事件交給子線程(Sub Reactor)去處理,一般情況下子線程是處于epoll_dispatch的調用上,如果沒有事件便處于阻塞的狀态。如何去喚醒阻塞在epoll_dispatch的子線程呢?這裡就涉及到socketpair技術的應用。
在初始化EventLoop對象時,建立一個socketpair對象(端對端的通信,類似于程序間通信的管道),把ScoketPair一端套接字上可讀事件添加到EventLoop的事件清單中去。
if(socketpair(AF_UNIX,SOCK_STREAM,0,eventLoop->socketPair) < 0)
{
printf("make socketPair error");
}
//socketPair[1]監聽EVENT_READ事件,每個線程都有一個socketpair
struct channel *channel1 = channel_new(eventLoop->socketPair[1],EVENT_READ,handleWakeup,NULL,eventLoop);
event_loop_add_channel_event(eventLoop,eventLoop->socketPair[1],channel1);
void event_loop_wakeup(void *data)
{
struct event_loop *eventLoop = (struct event_loop *)data;
char one = 'a';
size_t n = write(eventLoop->socketPair[0],&one,sizof one);
if(n < 0)
{
printf("event_loop_wakeup error");
return ;
}
printf("event_loop_wakeup error");
}
//從socketpair[1]上讀取該字元,成功讀取,表示目前的EventLoop被喚醒
void handleWakeup(void *data)
{
struct event_loop *eventLoop = (struct event_loop *)data;
char one;
size_t n = read(eventLoop->socketPair[1],&one,sizeof one);
if(n != sizeof one)
{
printf("handle wakeuo error");
return ;
}
printf("handle wakeup success");
}
當發現添加channel事件的EventLoop對象和目前線程不是同一個線程,那麼往被添加channel事件的EventLoop對象的socketpair[0]上面寫一個字元,随便什麼都可以。相當于用根“棍子”戳對方一下,嘿!大兄弟,有任務了,不要再阻塞了,趕緊醒過來。
介紹完socketPair對象,現在開始詳細介紹讀寫緩沖區buffer的構造和結構。
//buffer既可以表示發送緩沖區,也可以表示接收緩沖區。
struct buffer
{
void *data;
//目前讀取的位置
int readIndex;
//目前緩沖區中已經寫到的位置
int writeIndex;
//緩沖區大小
int totalSize;
};
int buffer_front_spare_size(struct buffer *buffer1)
{
//已經讀取過的空間,可以認為是空餘的空間,可以再次填充其它資料把它覆寫掉
return buffer1->readIndex;
}
int buffer_readable_size(struct buffer *buffer1)
{
//目前寫的位置減去讀取的位置,代表緩沖區中剩餘還未讀取的空間
return buffer1->writeIndex - buffer1->readIndex;
}
int buffer_writeable_size(struct buffer *buffer1)
{
//總大小減去目前已經寫的位置,代表還剩多少空間可寫
return buffer1->totalSize - buffer1->writeIndex;
}
void make_room(struct buffer *buffer1,int size)
{
if(buffer_writeable_size(buffer1) >= size)
{
return ;
}
int readableSize = 0;
if(buffer_front_size(buffer1) + buffer_readable_size(buffer1) > size)
{
//把未讀取完的資料拷貝到前面已經讀取完的空間去
readableSize = buffer_readable_size(buffer1);
for(int i =0;i < size;i++)
{
memcpy(buffer1->data + i,buffer1->data + buffer1->readIndex,1);
}
//更新緩沖區的讀寫索引,從0開始
buffer1->readIndex = 0;
//目前寫到的位置,就是上一次還未讀取完的資料的大小,從0開始計算
buffer1->writeIndex = readableSize;
}else
{
//如果要拼接的資料超過了已讀和待讀空間大小的總和,那麼重新開辟新的空間
void *temp = (void *)realloc(buffer1->data,buffer1->total_size + size);
buffer1->data = (void *)temp;
buffer1->total_size += size;
}
}
//拼接一段資料
void buffer_append(struct buffer *buffer1,void *data,int size)
{
if(data != NULL)
{
make_room(buffer1, size);
memcpy(buffer1->data + buffer1->writeIndex, data, size);
buffer1->writeIndex += size;
}
}
//拼接一個字元
void buffer_append_char(struct buffer *buffer1,char data)
{
make_room(buffer1,1);
buffer1->data[buffer1->writeIndex++] = data;
}
//拼接一段字元
void buffer_append_string(struct buffer *buffer1,char * str)
{
if(data != NULL)
{
int len = strlen(buffer1);
buffer_append_data(buffer1, data, len);
}
}
int buffer_socket_read(struct buffer *buffer1,int fd)
{
//為什麼要使用readv和writev函數,因為用戶端發送古來的資料很有可能是零散的,不連續的,資料是分布在零散的空間内,剛好readv和writev可以在零散地在多個記憶體空間内進行原子性的資料讀寫操作。
char additional_buffer[INIT_BUFFER_SIZE];
//iovec 顧名思義就是IO vector
struct iovec vec[2];
int max_writeable = buffer_writeable_size(buffer1);
//iov_base指向待讀取的第一段記憶體空間的資料
vec[0].iov_base = buffer1->data + buffer1->writeIndex;
//iov_len代表第一段待讀取記憶體空間資料的長度
vec[0].iov_len = max_writeable;
vec[1].iov_base = additional_buffer;
vec[1].iov_len = sizeof(additional_buffer);
int result = readv(fd,vec,2);
if(result < 0)
{
return -1;
}else if(result <= max_writeable)
{
//如果總共的資料長度小于接受緩沖區内可寫的空間的長度,那麼就直接把接收到的資料拼接在剩餘的可寫空間内
buffer1->writeIndex += result;
}else
{
buffer1->writeIndex = buffer1->total_size;
buffer_append(buffer1,additional_buffer,result - max_writeable);
}
return result;
}
char buffer_read_char(struct buffer *buffer1)
{
char c = buffer1->data[buffer1->readIndex++];
return c;
}
char * buffer_find_crlf(struct buffer *buffer1)
{
char *CRLF = "\r\n";
char *crlf = memmem(buffer1->data + buffer1->readIndex ,buffer_readable_size(buffer1),CRLF,2);
return crlf;
}
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIyVGduV2YfNWawNCM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0TPn1keZRUT00EVPpHOsJGcohVYsR2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLwkDN0MTNzIjM1EjMxAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
最後再來介紹下日志庫的設計,一個完整的系統肯定少不了日志庫的設計,在實際生産環境下,我們不可能使用printf向控制台執行日志的輸出列印,我們肯定需要自行設計一套日志庫出來,用于線上程式的追蹤和問題的排查。
#ifndef _LOG_H_
#define _LOG_H_
#include "stdarg.h"
#define LOG_DEBUG_LEVEL 0
#define LOG_MSG_LEVEL 1
#define LOG_WARN_LEVEL 2
#define LOG_ERR_LEVEL 3
void TcpServer_log(int security,const char *msg);
void TcpServer_logx(int security,const char *errstr,const char *fmt,va_list ap);
void TcpServer_Msgx(const char *fmt,...);
void TcpServer_debugx(const char *fmt,...);
void error(int status,int err,char *fmt,...);
#ifdef LOG_MSG(msg);
TcpServer_log(LOG_MSG_LEVEL,msg);
#endif
#ifdef LOG_ERR(msg)
TcpServer_log(LOG_ERR_LEVEL,msg);
#endif
#endif
#include "log.h"
#include "syslog.h"
#include <errno.h>
#define MAXLINE 4096 //一個日志檔案隻有4KB
void error(int status,int err,char *fmt,...)
{
//va_list實際上是一個指針,用來指向error函數的各個不定參數。
va_list ap;
//va_start将va_list定義的指針指向error函數的第一個不定參數,也即是fmt參數後面的一個參數。
va_start(ap,fmt);
//将fmt(錯誤資訊類型),錯誤資訊内容拼接到标準錯誤中去(Linux環境下,有标準輸入、标準輸出、标準錯誤)
vfprintf(stderr,fmt,ap);
va_end(ap);
if(err)
{
fprintf(stderr," :%s,(%d)\n",strerror(err),err);
}
if(status)
{
exit(status);
}
}
static void
err_doit(int errnoflag, int level, const char *fmt, va_list ap)
{
int errno_save, n;
char buf[MAXLINE + 1];
errno_save = errno;
/* value caller might want printed */
vsnprintf(buf, MAXLINE, fmt, ap);
n = strlen(buf);
if (errnoflag)
snprintf(buf + n, MAXLINE - n, ": %s", strerror(errno_save));
strcat(buf, "\n");
fflush(stdout);
/* in case stdout and stderr are the same */
fputs(buf, stderr);
fflush(stderr);
return;
}
void TcpServer_log(int security,const char *msg)
{
const char *security_str;
switch(security)
{
case LOG_DEBUG_LEVEL:
security_str = "debug";
break;
case LOG_MSG_LEVEL:
security_str = "msg";
break;
case LOG_WARN_LEVEL:
security_str = "warn";
break;
case LOG_ERR_LEVEL:
security_str = "err";
break;
default:
security_str = "unkonow";
break;
}
fprintf(stdout,"[%s] %s\n",security_str,msg);
}
void TcpServer_logx(int security,const char *errstr,const char *fmt,va_list ap)
{
char buf[1024];
size_t len;
if(fmt != NULL)
{
vsnprintf(buf,sizeof(buf),fmt,ap);
}else
{
buf[0] = '\0';
}
if(errstr)
{
len = strlen(buf);
if(len < sizeof(buf) -3)
{
snprintf(buf + len,sizeof(buf) - len,": %s",errstr);
}
}
TcpServer_log(security,buf);
}
void TcpServer_msg(const char *fmt,...)
{
va_list ap;
va_start(ap,fmt);
TcpServer_logx(LOG_MSG_LEVEL,NULL,fmt,ap);
va_end(ap);
}
void TcpServer_msgx(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
TcpServer_logx(LOG_MSG_TYPE, NULL, fmt, ap);
va_end(ap);
}
void TcpServer_debugx(const char *fmt,...)
{
va_list ap;
va_start(ap,fmt);
TcpServer_logx(LOG_DEBUG_LEVEL,NULL,fmt,ap);
va_end(ap);
}
需要源碼的同學請微信私聊,後期會更新至GitHub。