天天看點

Redis代碼閱讀3--Redis網絡監聽(3)



是介紹Redis網絡監聽的最後一篇文章,着重分析定時時間處理函數serverCron,這個函數其實已經和網絡監聽沒多大關系了,當時因為其綁定在Redis自定義的事件庫的定時事件上,是以放到一起來講。serverCron的這個函數對Redis的正常運作來說很重要,對于Redis的使用者來說,最重要的就是能夠迅速直覺地看到Redis的目前的運作狀況(keys,sizes,memory等),serverCron就能夠使使用者得知這些資訊,此外,serverCron這個方法定時周期地運作,還承擔了AOF

Write,VM Swap,BGSAVE,Rehash的操作,使得Redis的運作更加平穩。還是來直接通過代碼來分析:

Redis代碼閱讀3--Redis網絡監聽(3)

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {  

    int j, loops = server.cronloops;  

    REDIS_NOTUSED(eventLoop);  

    REDIS_NOTUSED(id);  

    REDIS_NOTUSED(clientData);  

    /* We take a cached value of the unix time in the global state because 

     * with virtual memory and aging there is to store the current time 

     * in objects at every object access, and accuracy is not needed. 

     * To access a global var is faster than calling time(NULL) */  

    server.unixtime = time(NULL);  

    /* We have just 22 bits per object for LRU information. 

     * So we use an (eventually wrapping) LRU clock with 10 seconds resolution. 

     * 2^22 bits with 10 seconds resoluton is more or less 1.5 years. 

     * 

     * Note that even if this will wrap after 1.5 years it's not a problem, 

     * everything will still work but just some object will appear younger 

     * to Redis. But for this to happen a given object should never be touched 

     * for 1.5 years. 

     * Note that you can change the resolution altering the 

     * REDIS_LRU_CLOCK_RESOLUTION define. 

     */  

    updateLRUClock();  

    /* We received a SIGTERM, shutting down here in a safe way, as it is 

     * not ok doing so inside the signal handler. */  

    if (server.shutdown_asap) {  

        if (prepareForShutdown() == REDIS_OK) exit(0);  

        redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");  

    }  

    /* Show some info about non-empty databases */  

    for (j = 0; j < server.dbnum; j++) {  

        long long size, used, vkeys;  

        size = dictSlots(server.db[j].dict);  

        used = dictSize(server.db[j].dict);  

        vkeys = dictSize(server.db[j].expires);  

        if (!(loops % 50) && (used || vkeys)) {  

            redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);  

            /* dictPrintStats(server.dict); */  

        }  

    /* We don't want to resize the hash tables while a bacground saving 

     * is in progress: the saving child is created using fork() that is 

     * implemented with a copy-on-write semantic in most modern systems, so 

     * if we resize the HT while there is the saving child at work actually 

     * a lot of memory movements in the parent will cause a lot of pages 

     * copied. */  

    if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1) {  

        if (!(loops % 10)) tryResizeHashTables();  

        if (server.activerehashing) incrementallyRehash();  

    /* Show information about connected clients */  

    if (!(loops % 50)) {  

        redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use",  

            listLength(server.clients)-listLength(server.slaves),  

            listLength(server.slaves),  

            zmalloc_used_memory());  

    /* Close connections of timedout clients */  

    if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)  

        closeTimedoutClients();  

    /* Check if a background saving or AOF rewrite in progress terminated */  

    if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {  

        int statloc;  

        pid_t pid;  

        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {  

            if (pid == server.bgsavechildpid) {  

                backgroundSaveDoneHandler(statloc);  

            } else {  

                backgroundRewriteDoneHandler(statloc);  

            }  

            updateDictResizePolicy();  

    } else {  

        /* If there is not a background saving in progress check if 

         * we have to save now */  

         time_t now = time(NULL);  

         for (j = 0; j < server.saveparamslen; j++) {  

            struct saveparam *sp = server.saveparams+j;  

            if (server.dirty >= sp->changes &&  

                now-server.lastsave > sp->seconds) {  

                redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",  

                    sp->changes, sp->seconds);  

                rdbSaveBackground(server.dbfilename);  

                break;  

         }  

    /* Expire a few keys per cycle, only if this is a master. 

     * On slaves we wait for DEL operations synthesized by the master 

     * in order to guarantee a strict consistency. */  

    if (server.masterhost == NULL) activeExpireCycle();  

    /* Swap a few keys on disk if we are over the memory limit and VM 

     * is enbled. Try to free objects from the free list first. */  

    if (vmCanSwapOut()) {  

        while (server.vm_enabled && zmalloc_used_memory() >  

                server.vm_max_memory)  

        {  

            int retval = (server.vm_max_threads == 0) ?  

                        vmSwapOneObjectBlocking() :  

                        vmSwapOneObjectThreaded();  

            if (retval == REDIS_ERR && !(loops % 300) &&  

                zmalloc_used_memory() >  

                (server.vm_max_memory+server.vm_max_memory/10))  

            {  

                redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");  

            /* Note that when using threade I/O we free just one object, 

             * because anyway when the I/O thread in charge to swap this 

             * object out will finish, the handler of completed jobs 

             * will try to swap more objects if we are still out of memory. */  

            if (retval == REDIS_ERR || server.vm_max_threads > 0) break;  

    /* Replication cron function -- used to reconnect to master and 

     * to detect transfer failures. */  

    if (!(loops % 10)) replicationCron();  

    server.cronloops++;  

    return 100;  

}  

/* This function gets called every time Redis is entering the 

 * main loop of the event driven library, that is, before to sleep 

 * for ready file descriptors. */  

void beforeSleep(struct aeEventLoop *eventLoop) {  

    listNode *ln;  

    redisClient *c;  

    /* Awake clients that got all the swapped keys they requested */  

    if (server.vm_enabled && listLength(server.io_ready_clients)) {  

        listIter li;  

        listRewind(server.io_ready_clients,&li);  

        while((ln = listNext(&li))) {  

            c = ln->value;  

            struct redisCommand *cmd;  

            /* Resume the client. */  

            listDelNode(server.io_ready_clients,ln);  

            c->flags &= (~REDIS_IO_WAIT);  

            server.vm_blocked_clients--;  

            aeCreateFileEvent(server.el, c->fd, AE_READABLE,  

                readQueryFromClient, c);  

            cmd = lookupCommand(c->argv[0]->ptr);  

            redisAssert(cmd != NULL);  

            call(c,cmd);  

            resetClient(c);  

            /* There may be more data to process in the input buffer. */  

            if (c->querybuf && sdslen(c->querybuf) > 0)  

                processInputBuffer(c);  

    /* Try to process pending commands for clients that were just unblocked. */  

    while (listLength(server.unblocked_clients)) {  

        ln = listFirst(server.unblocked_clients);  

        redisAssert(ln != NULL);  

        c = ln->value;  

        listDelNode(server.unblocked_clients,ln);  

        c->flags &= ~REDIS_UNBLOCKED;  

        /* Process remaining data in the input buffer. */  

        if (c->querybuf && sdslen(c->querybuf) > 0)  

            processInputBuffer(c);  

    /* Write the AOF buffer on disk */  

    flushAppendOnlyFile();  

 i.    首先将server.cronloops的值賦給loops,server.cronloops指的是serverCron函數的運作次數,每運作一次serverCron函數,server.cronloops++,server.cronloops的内部執行邏輯随着server.cronloops值的不同而改變;

ii.    用server.unixtime = time(NULL)來儲存目前時間,因為在virtual memory and aging的時候,需要知道每次Object的access時間,但是這個時間不需要很精确,是以通過全局變量來擷取時間比調用time(NULL)快多了;

iii.    記錄Redis的最大記憶體使用量;如果收到了SIGTERM信号,則試圖終止Redis

iv.    serverCron方法每運作50次顯示Redis内各個非空的DB的使用情況(used,keys,sizes)及目前連接配接的clients,使用的記憶體大小;

v.    serverCron方法每運作10次,将試圖進行一次Rehash,如果一個a bacground saving正在進行,則不進行rehash,以免造成部分資料丢失;

vi.    關閉timeout的clients;

vii.    如果在執行BGSAVE期間,client執行了bgrewriteaof這個指令,則在serverCron将開始執行a scheduled AOF rewrite

viii.    如果目前Redis正在進行BGSAVE或者AOF rewrite,則check BGSAVE或者AOF rewrite是否已經終止,如果終止則調用相應的函數處理(backgroundSaveDoneHandler/backgroundRewriteDoneHandler),如果目前沒有BGSAVE或者AOF rewrite操作,則判斷是否進行此類操作,如果需要,則觸發此類操作;

ix.    如果有AOF buffer flush操作被暫停了,則每次調用serverCron的時候,恢複AOF buffer flush操作

x.    如果是Master,則周期性地使某些key(随即挑選的)過期,注意這個操作僅僅隻針對Master,如果是slaves,則隻有通過master的del操作來同步key,以做到強一緻性;

xi.    VM的Swap操作

xii.    每運作10次,進行replicationCron,如果存在slaves的話

xiii.    傳回100,表示serverCron方法每100毫秒被調用一次,這一點在processTimeEvent這個方法裡得以展現:

Redis代碼閱讀3--Redis網絡監聽(3)

if (retval != AE_NOMORE) {  

                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);  

                aeDeleteTimeEvent(eventLoop, id);  

通過上面的分析,ServerCron側重在Rehash,VM Swap, AOF write,BGSAVE等操作,而這些操作都是耗時,而且影響Redis對Clients的響應速度的,是以我們在實際應用的時候可以根據具體情況通過改變類似這樣的操作:”loops % 10“來決定上述耗時操作的執行頻率,有空我會測試下在不同頻率下,redis在壓力測試下的性能。

此次, Redis的網絡監聽部分都介紹完了。再回過頭來看前面提到的幾個問題:

1.         Redis支援 epoll, select, kquque,,通過配置檔案來決定采取哪一種

2.         支援檔案讀寫事件和定時事件

3.         采用數組來維護檔案事件,連結清單來儲存定時事件(在查找定時事件時,性能不高,有待提高)

4.         Redis Server單線程響應事件,按照先後順序來響應事件,是以單台 Redis伺服器的吞吐量會随着連接配接的 clients越來越多而下降,可以通過增加更多的 Redis伺服器來解決這個問題

5.         Redis在很多代碼裡都考慮到了盡快地響應各種事件,如在 aeProcessEvent裡面,輪詢的 wait時間等于目前時間和最近的定時事件響應時間的內插補點;每次進入輪詢 wait之前,在 beforesleep方法裡先響應剛剛

unblock的 clients等。

繼續閱讀