天天看點

《簡述Linux平台下伺服器架構的搭建思路(三)》

現在來陳述第二節遺留的問題,在多線程的場景下,假如主線程檢測到新連接配接的***已連接配接套接字***上有讀寫事件發生時,那麼此時如何把該套接字的IO事件交給子線程(Sub Reactor)去處理,一般情況下子線程是處于epoll_dispatch的調用上,如果沒有事件便處于阻塞的狀态。如何去喚醒阻塞在epoll_dispatch的子線程呢?這裡就涉及到socketpair技術的應用。

在初始化EventLoop對象時,建立一個socketpair對象(端對端的通信,類似于程序間通信的管道),把ScoketPair一端套接字上可讀事件添加到EventLoop的事件清單中去。

if(socketpair(AF_UNIX,SOCK_STREAM,0,eventLoop->socketPair) < 0)
 {
	printf("make socketPair error");
 }
 //socketPair[1]監聽EVENT_READ事件,每個線程都有一個socketpair
 struct channel *channel1 = channel_new(eventLoop->socketPair[1],EVENT_READ,handleWakeup,NULL,eventLoop);
 event_loop_add_channel_event(eventLoop,eventLoop->socketPair[1],channel1);
 
 void event_loop_wakeup(void *data)
 {
	struct event_loop *eventLoop = (struct event_loop *)data;
	char one = 'a';
	size_t n = write(eventLoop->socketPair[0],&one,sizof one);
	if(n < 0)
	{
		printf("event_loop_wakeup error");
		return ;
	}
	printf("event_loop_wakeup error");
}
//從socketpair[1]上讀取該字元,成功讀取,表示目前的EventLoop被喚醒
void handleWakeup(void *data)
{
  struct event_loop *eventLoop = (struct event_loop *)data;
  char one;
  size_t n = read(eventLoop->socketPair[1],&one,sizeof one);
  if(n != sizeof one)
  {
  	printf("handle wakeuo error");
  	return ;
  }
  printf("handle wakeup success");
}
           

當發現添加channel事件的EventLoop對象和目前線程不是同一個線程,那麼往被添加channel事件的EventLoop對象的socketpair[0]上面寫一個字元,随便什麼都可以。相當于用根“棍子”戳對方一下,嘿!大兄弟,有任務了,不要再阻塞了,趕緊醒過來。

介紹完socketPair對象,現在開始詳細介紹讀寫緩沖區buffer的構造和結構。

//buffer既可以表示發送緩沖區,也可以表示接收緩沖區。
struct buffer
{
   void *data;
   //目前讀取的位置
   int readIndex;
   //目前緩沖區中已經寫到的位置
   int writeIndex;
   //緩沖區大小
   int totalSize;
};

int buffer_front_spare_size(struct buffer *buffer1)
{
	//已經讀取過的空間,可以認為是空餘的空間,可以再次填充其它資料把它覆寫掉
	return buffer1->readIndex;
}
int buffer_readable_size(struct buffer *buffer1)
{
   //目前寫的位置減去讀取的位置,代表緩沖區中剩餘還未讀取的空間
	return buffer1->writeIndex - buffer1->readIndex;
}

int buffer_writeable_size(struct buffer *buffer1)
{
    //總大小減去目前已經寫的位置,代表還剩多少空間可寫
	return buffer1->totalSize - buffer1->writeIndex;
}

void make_room(struct buffer *buffer1,int size)
{
	if(buffer_writeable_size(buffer1) >= size)
	{
		return ;
	}
	int readableSize = 0;
	if(buffer_front_size(buffer1) + buffer_readable_size(buffer1) > size)
	{
	   //把未讀取完的資料拷貝到前面已經讀取完的空間去
	  readableSize = buffer_readable_size(buffer1);
	  for(int i =0;i < size;i++)
	  {
	  	memcpy(buffer1->data + i,buffer1->data + buffer1->readIndex,1);
	  }
	  //更新緩沖區的讀寫索引,從0開始
	  buffer1->readIndex = 0;
	  //目前寫到的位置,就是上一次還未讀取完的資料的大小,從0開始計算
	  buffer1->writeIndex = readableSize;
    }else
    {
    	//如果要拼接的資料超過了已讀和待讀空間大小的總和,那麼重新開辟新的空間
    	void *temp = (void *)realloc(buffer1->data,buffer1->total_size + size);
    	buffer1->data = (void *)temp;
    	buffer1->total_size += size;
	}
}
//拼接一段資料
void buffer_append(struct buffer *buffer1,void *data,int size)
{
	if(data != NULL)
	{
	   make_room(buffer1, size);
	   memcpy(buffer1->data + buffer1->writeIndex, data, size);
	   buffer1->writeIndex += size;
	}
}
//拼接一個字元
void buffer_append_char(struct buffer *buffer1,char data)
{
	make_room(buffer1,1);
	buffer1->data[buffer1->writeIndex++] = data;
}
//拼接一段字元
void buffer_append_string(struct buffer *buffer1,char * str)
{
   if(data != NULL)
   {
     int len = strlen(buffer1);
     buffer_append_data(buffer1, data, len);
   }
}

int buffer_socket_read(struct buffer *buffer1,int fd)
{
	//為什麼要使用readv和writev函數,因為用戶端發送古來的資料很有可能是零散的,不連續的,資料是分布在零散的空間内,剛好readv和writev可以在零散地在多個記憶體空間内進行原子性的資料讀寫操作。
	char additional_buffer[INIT_BUFFER_SIZE];
	//iovec 顧名思義就是IO vector
	struct iovec vec[2];
	int max_writeable = buffer_writeable_size(buffer1);
	//iov_base指向待讀取的第一段記憶體空間的資料
	vec[0].iov_base = buffer1->data + buffer1->writeIndex;
	//iov_len代表第一段待讀取記憶體空間資料的長度
	vec[0].iov_len = max_writeable;
	vec[1].iov_base = additional_buffer;
	vec[1].iov_len = sizeof(additional_buffer);
	int result = readv(fd,vec,2);
	if(result < 0)
	{
		return -1;
	}else if(result <= max_writeable)
	{
	   //如果總共的資料長度小于接受緩沖區内可寫的空間的長度,那麼就直接把接收到的資料拼接在剩餘的可寫空間内
		buffer1->writeIndex += result;
	}else
	{
		buffer1->writeIndex = buffer1->total_size;
		buffer_append(buffer1,additional_buffer,result - max_writeable);
	}
	return result;
}

char buffer_read_char(struct buffer *buffer1)
{
	char c = buffer1->data[buffer1->readIndex++];
	return c;
}

char * buffer_find_crlf(struct buffer *buffer1)
{
	char *CRLF = "\r\n";
	char *crlf = memmem(buffer1->data + buffer1->readIndex ,buffer_readable_size(buffer1),CRLF,2);
	return crlf;
}
           
《簡述Linux平台下伺服器架構的搭建思路(三)》

最後再來介紹下日志庫的設計,一個完整的系統肯定少不了日志庫的設計,在實際生産環境下,我們不可能使用printf向控制台執行日志的輸出列印,我們肯定需要自行設計一套日志庫出來,用于線上程式的追蹤和問題的排查。

#ifndef _LOG_H_
#define _LOG_H_
#include "stdarg.h"
#define LOG_DEBUG_LEVEL 0
#define LOG_MSG_LEVEL   1
#define LOG_WARN_LEVEL  2
#define LOG_ERR_LEVEL   3
void TcpServer_log(int security,const char *msg);
void TcpServer_logx(int security,const char *errstr,const char *fmt,va_list ap);
void TcpServer_Msgx(const char *fmt,...);
void TcpServer_debugx(const char *fmt,...);
void error(int status,int err,char *fmt,...);
#ifdef LOG_MSG(msg);
TcpServer_log(LOG_MSG_LEVEL,msg);
#endif
#ifdef LOG_ERR(msg)
TcpServer_log(LOG_ERR_LEVEL,msg);
#endif
#endif
           
#include "log.h"
#include "syslog.h"
#include <errno.h>
#define MAXLINE 4096 //一個日志檔案隻有4KB
void error(int status,int err,char *fmt,...)
{
    //va_list實際上是一個指針,用來指向error函數的各個不定參數。
	va_list ap;
	//va_start将va_list定義的指針指向error函數的第一個不定參數,也即是fmt參數後面的一個參數。
	va_start(ap,fmt);
	//将fmt(錯誤資訊類型),錯誤資訊内容拼接到标準錯誤中去(Linux環境下,有标準輸入、标準輸出、标準錯誤)
	vfprintf(stderr,fmt,ap);
	va_end(ap);
	if(err)
	{
		fprintf(stderr," :%s,(%d)\n",strerror(err),err);
	}
	if(status)
	{
	   exit(status);
	}
}

static void
err_doit(int errnoflag, int level, const char *fmt, va_list ap) 
{
	int errno_save, n;
	char buf[MAXLINE + 1];
	errno_save = errno;        
	/* value caller might want printed */
	vsnprintf(buf, MAXLINE, fmt, ap);
	n = strlen(buf);
	if (errnoflag)
	    snprintf(buf + n, MAXLINE - n, ": %s", strerror(errno_save));
	strcat(buf, "\n");
	fflush(stdout);        
	/* in case stdout and stderr are the same */
	fputs(buf, stderr);
	fflush(stderr);
	return;
}
void TcpServer_log(int security,const char *msg)
{
	const char *security_str;
	switch(security)
	{
	case LOG_DEBUG_LEVEL:
	   security_str = "debug";
	   break;
	case LOG_MSG_LEVEL:
		security_str = "msg";
		break;
	case LOG_WARN_LEVEL:
		security_str = "warn";
		break;
	case LOG_ERR_LEVEL:
		security_str = "err";
		break;
	default:
		security_str = "unkonow";
		break;
	}
	fprintf(stdout,"[%s] %s\n",security_str,msg);
}

void TcpServer_logx(int security,const char *errstr,const char *fmt,va_list ap)
{
	char buf[1024];
	size_t len;
	if(fmt != NULL)
	{
		vsnprintf(buf,sizeof(buf),fmt,ap);
	}else
	{
		buf[0] = '\0';
	}
	if(errstr)
	{
		len = strlen(buf);
		if(len < sizeof(buf) -3)
		{
			snprintf(buf + len,sizeof(buf) - len,":   %s",errstr);
		}
	}
	TcpServer_log(security,buf);
}
void TcpServer_msg(const char *fmt,...)
{
	va_list ap;
	va_start(ap,fmt);
	TcpServer_logx(LOG_MSG_LEVEL,NULL,fmt,ap);
	va_end(ap);
}

void TcpServer_msgx(const char *fmt, ...)
{
	va_list ap;   
	va_start(ap, fmt);
	TcpServer_logx(LOG_MSG_TYPE, NULL, fmt, ap);
	va_end(ap);
}

void TcpServer_debugx(const char *fmt,...)
{
	va_list ap;
	va_start(ap,fmt);
	TcpServer_logx(LOG_DEBUG_LEVEL,NULL,fmt,ap);
	va_end(ap);
}
           

需要源碼的同學請微信私聊,後期會更新至GitHub。

《簡述Linux平台下伺服器架構的搭建思路(三)》

繼續閱讀