緩存是系統中很重要的組成部分。在很多系統中,将那些耗時又耗資源的東西臨時儲存在一個能夠快速擷取的地方,以此減少時間和資源的消耗。在傳統的緩存方式中,可以使用哈希表将查詢或者計算的資料暫時儲存起來,下次需要時可以直接從哈希表中取出這個資料而不用執行具體的查詢和計算操作,進而節省了時間。這種打表的方式有點類似于存儲結構裡的Cache和快表的思路。但是使用哈希表的話,哈希表存儲在擁有該哈希表的程序的位址空間,在網絡環境下,多個使用者同時通路伺服器,就會有多個程序,但是這多個程序不能共享哈希表,每個程序需要維護自己的哈希表,這樣造成了資料的備援和存儲空間的浪費,memcached就是為了解決這個問題而出現的。
memcached通俗地講就是将哈希表獨立出來,構成專門的緩存伺服器,放到了網上提供服務,讓程序共享這個哈希表。既然要通過遠端的伺服器擷取緩存資料,那傳統的網絡程式設計技術就會涉及到TCP/UDP的Socket程式設計。現在我們需要進行的工作就清楚了,将基于TCP協定的資料傳輸改為RDMA方式。
首先,得先了解一下memcached本身的資料傳輸是如何完成的。memcached使用了libevent,在5種套接字I/O模型中屬于完成端口模型。libevent是事件驅動的,使用時類似于Java或C#給事件注冊監聽器的方式,而不需要在一個循環中等待特定事件的發生。這樣需要同步的消息循環方式變成了異步的事件驅動方式。
具體看下代碼。memcached的主要函數都在memcached.c這個檔案中。
struct stats {
pthread_mutex_t mutex;
unsigned int curr_items;
unsigned int total_items;
uint64_t curr_bytes;
unsigned int curr_conns;
unsigned int total_conns;
uint64_t rejected_conns;
uint64_t malloc_fails;
unsigned int reserved_fds;
unsigned int conn_structs;
uint64_t get_cmds;
uint64_t set_cmds;
uint64_t touch_cmds;
uint64_t get_hits;
uint64_t get_misses;
uint64_t touch_hits;
uint64_t touch_misses;
uint64_t evictions;
uint64_t reclaimed;
time_t started; /* when the process was started */
bool accepting_conns; /* whether we are currently accepting */
uint64_t listen_disabled_num;
unsigned int hash_power_level; /* Better hope it's not over 9000 */
uint64_t hash_bytes; /* size used for hash tables */
bool hash_is_expanding; /* If the hash table is being expanded */
uint64_t expired_unfetched; /* items reclaimed but never touched */
uint64_t evicted_unfetched; /* items evicted but never touched */
bool slab_reassign_running; /* slab reassign in progress */
uint64_t slabs_moved; /* times slabs were moved around */
uint64_t lru_crawler_starts; /* Number of item crawlers kicked off */
bool lru_crawler_running; /* crawl in progress */
uint64_t lru_maintainer_juggles; /* number of LRU bg pokes */
};
struct settings {
size_t maxbytes;
int maxconns;
int port;
int udpport;
char *inter;
int verbose;
rel_time_t oldest_live; /* ignore existing items older than this */
uint64_t oldest_cas; /* ignore existing items with CAS values lower than this */
int evict_to_free;
char *socketpath; /* path to unix socket if using local socket */
int access; /* access mask (a la chmod) for unix domain socket */
double factor; /* chunk size growth factor */
int chunk_size;
int num_threads; /* number of worker (without dispatcher) libevent threads to run */
int num_threads_per_udp; /* number of worker threads serving each udp socket */
char prefix_delimiter; /* character that marks a key prefix (for stats) */
int detail_enabled; /* nonzero if we're collecting detailed stats */
int reqs_per_event; /* Maximum number of io to process on each
io-event. */
bool use_cas;
enum protocol binding_protocol;
int backlog;
int item_size_max; /* Maximum item size, and upper end for slabs */
bool sasl; /* SASL on/off */
bool maxconns_fast; /* Whether or not to early close connections */
bool lru_crawler; /* Whether or not to enable the autocrawler thread */
bool lru_maintainer_thread; /* LRU maintainer background thread */
bool slab_reassign; /* Whether or not slab reassignment is allowed */
int slab_automove; /* Whether or not to automatically move slabs */
int hashpower_init; /* Starting hash power level */
bool shutdown_command; /* allow shutdown command */
int tail_repair_time; /* LRU tail refcount leak repair time */
bool flush_enabled; /* flush_all enabled */
char *hash_algorithm; /* Hash algorithm in use */
int lru_crawler_sleep; /* Microsecond sleep between items */
uint32_t lru_crawler_tocrawl; /* Number of items to crawl per run */
int hot_lru_pct; /* percentage of slab space for HOT_LRU */
int warm_lru_pct; /* percentage of slab space for WARM_LRU */
int crawls_persleep; /* Number of LRU crawls to run before sleeping */
bool expirezero_does_not_evict; /* exptime == 0 goes into NOEXP_LRU */
};
struct conn {
int sfd;
sasl_conn_t *sasl_conn;
bool authenticated;
enum conn_states state;
enum bin_substates substate;
rel_time_t last_cmd_time;
struct event event;
short ev_flags;
short which; /** which events were just triggered */
char *rbuf; /** buffer to read commands into */
char *rcurr; /** but if we parsed some already, this is where we stopped */
int rsize; /** total allocated size of rbuf */
int rbytes; /** how much data, starting from rcur, do we have unparsed */
char *wbuf;
char *wcurr;
int wsize;
int wbytes;
/** which state to go into after finishing current write */
enum conn_states write_and_go;
void *write_and_free; /** free this memory after finishing writing */
char *ritem; /** when we read in an item's value, it goes here */
int rlbytes;
/* data for the nread state */
/**
* item is used to hold an item structure created after reading the command
* line of set/add/replace commands, but before we finished reading the actual
* data. The data is read into ITEM_data(item) to avoid extra copying.
*/
void *item; /* for commands set/add/replace */
/* data for the swallow state */
int sbytes; /* how many bytes to swallow */
/* data for the mwrite state */
struct iovec *iov;
int iovsize; /* number of elements allocated in iov[] */
int iovused; /* number of elements used in iov[] */
struct msghdr *msglist;
int msgsize; /* number of elements allocated in msglist[] */
int msgused; /* number of elements used in msglist[] */
int msgcurr; /* element in msglist[] being transmitted now */
int msgbytes; /* number of bytes in current msg */
item **ilist; /* list of items to write out */
int isize;
item **icurr;
int ileft;
char **suffixlist;
int suffixsize;
char **suffixcurr;
int suffixleft;
enum protocol protocol; /* which protocol this connection speaks */
enum network_transport transport; /* what transport is used by this connection */
/* data for UDP clients */
int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
struct sockaddr_in6 request_addr; /* udp: Who sent the most recent request */
socklen_t request_addr_size;
unsigned char *hdrbuf; /* udp packet headers */
int hdrsize; /* number of headers' worth of space is allocated */
bool noreply; /* True if the reply should not be sent. */
/* current stats command */
struct {
char *buffer;
size_t size;
size_t offset;
} stats;
/* Binary protocol stuff */
/* This is where the binary header goes */
protocol_binary_request_header binary_header;
uint64_t cas; /* the cas to return */
short cmd; /* current command being processed */
int opaque;
int keylen;
conn *next; /* Used for generating a list of conn structures */
LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
};
上面三個結構體分别在memcached.h的第258、299、421行,分别是伺服器的狀态、memcached的設定和網絡連接配接的設定。然而并不清楚每個成員都是什麼鬼。
memcached.c中第4392行建立socket套接字函數。
static int new_socket(struct addrinfo *ai) {
int sfd;
int flags;
if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) {//socket建立,和WSA很像
return -1;
}
if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
perror("setting O_NONBLOCK");
close(sfd);
return -1;
}
return sfd;
}
static int server_socket(const char *interface,
int port,
enum network_transport transport,
FILE *portnumber_file) {
int sfd;
struct linger ling = {0, 0};
struct addrinfo *ai;
struct addrinfo *next;
struct addrinfo hints = { .ai_flags = AI_PASSIVE,
.ai_family = AF_UNSPEC };//socket位址結構體
char port_buf[NI_MAXSERV];
int error;
int success = 0;
int flags =1;
hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;//資料報還是資料流
if (port == -1) {
port = 0;
}
/*差錯處理,可以不理
snprintf(port_buf, sizeof(port_buf), "%d", port);
error= getaddrinfo(interface, port_buf, &hints, &ai);
if (error != 0) {
if (error != EAI_SYSTEM)
fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
else
perror("getaddrinfo()");
return 1;
}
*/
for (next= ai; next; next= next->ai_next) {
conn *listen_conn_add;
if ((sfd = new_socket(next)) == -1) {//這裡建立socket套接字了
/* getaddrinfo can return "junk" addresses,
* we make sure at least one works before erroring.
*/
if (errno == EMFILE) {
/* ...unless we're out of fds */
perror("server_socket");
exit(EX_OSERR);
}
continue;
}
/*IPv6,可以不理
#ifdef IPV6_V6ONLY
if (next->ai_family == AF_INET6) {
error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags, sizeof(flags));
if (error != 0) {
perror("setsockopt");
close(sfd);
continue;
}
}
#endif
*/
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
/*差錯處理
if (IS_UDP(transport)) {
maximize_sndbuf(sfd);
} else {
error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
if (error != 0)
perror("setsockopt");
error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
if (error != 0)
perror("setsockopt");
error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
if (error != 0)
perror("setsockopt");
}
*/
if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {//這裡綁定socket了
if (errno != EADDRINUSE) {
perror("bind()");
close(sfd);
freeaddrinfo(ai);
return 1;
}
close(sfd);
continue;
} else {
success++;
if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {//上邊的if是UDP,這裡是TCP,是以開始監聽
perror("listen()");
close(sfd);
freeaddrinfo(ai);
return 1;
}
if (portnumber_file != NULL &&
(next->ai_addr->sa_family == AF_INET ||
next->ai_addr->sa_family == AF_INET6)) {
union {
struct sockaddr_in in;
struct sockaddr_in6 in6;
} my_sockaddr;
socklen_t len = sizeof(my_sockaddr);
if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {//連接配接還沒建立,getsockname在這有什麼用并不懂...
if (next->ai_addr->sa_family == AF_INET) {
fprintf(portnumber_file, "%s INET: %u\n",
IS_UDP(transport) ? "UDP" : "TCP",
ntohs(my_sockaddr.in.sin_port));
} else {
fprintf(portnumber_file, "%s INET6: %u\n",
IS_UDP(transport) ? "UDP" : "TCP",
ntohs(my_sockaddr.in6.sin6_port));
}
}
}
}
if (IS_UDP(transport)) {
int c;
for (c = 0; c < settings.num_threads_per_udp; c++) {
/* Allocate one UDP file descriptor per worker thread;
* this allows "stats conns" to separately list multiple
* parallel UDP requests in progress.
*
* The dispatch code round-robins new connection requests
* among threads, so this is guaranteed to assign one
* FD to each thread.
*/
int per_thread_fd = c ? dup(sfd) : sfd;
dispatch_conn_new(per_thread_fd, conn_read,
EV_READ | EV_PERSIST,
UDP_READ_BUFFER_SIZE, transport);
}
} else {
if (!(listen_conn_add = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
transport, main_base))) {//這裡建立TCP的連接配接了
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}
listen_conn_add->next = listen_conn;
listen_conn = listen_conn_add;
}
}
freeaddrinfo(ai);
/* Return zero iff we detected no errors in starting up connections */
return success == 0;
}
下面看一下conn_new()的過程,為清晰呈現,隻粘了主要功能性部分。
static struct event_base *main_base;//這個是libevent最主要的資料結構了
main_base = event_init();
conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
struct event_base *base) {
c->sfd = sfd;
c->state = init_state;
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);//event_handler事件句柄,事件來了就可以觸發
event_base_set(base, &c->event);
event_add(&c->event, 0);
}
void event_handler(const int fd, const short which, void *arg) {
drive_machine(c);
}
static void drive_machine(conn *c) {
while (!stop) {
switch(c->state) {//根據state判斷響應哪種事件
case conn_listening:
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
fcntl(sfd, F_SETFL, flags | O_NONBLOCK);
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,//分發新到來的連接配接
DATA_BUFFER_SIZE, tcp_transport);
}
//......
//後面就是一些比如從緩沖區讀寫資料,接收用戶端發過來的指令等等操作。然而這些好像并不在我們比賽的内容裡。
以上就是我近期看的一些内容,跟大家分享一下,希望有所幫助。了解錯的地方歡迎大家幫我指出。下面這個連結是memcached的代碼解析,有比較詳細的注釋,可以參考。
http://blog.csdn.net/column/details/lc-memcached.html?&page=1