天天看點

[redis設計與實作][9]複制複制(Redis2.8)

slaveof host port

redis的主從複制設定非常友善,隻需要在從伺服器上設定主伺服器的ip和端口即可。如果需要關閉主從同步,隻需要執行slaveof no one即可。 該指令的具體描述見官方文檔

void slaveofcommand(redisclient *c) {

// 先處理no one,解除現有的主從同步

if (!strcasecmp(c->argv[1]->ptr,“no“) &&

!strcasecmp(c->argv[2]->ptr,“one“)) {

if (server.masterhost) {

replicationunsetmaster();

redislog(redis_notice,“master mode enabled (user request)“);

}

} else {

long port;

if ((getlongfromobjectorreply(c, c->argv[2], &port, null) != redis_ok))

return;

/* check if we are already attached to the specified slave */

if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)

&& server.masterport == port) {

redislog(redis_notice,“slave of would result into synchronization with the master we are already connected with. no operation performed.“);

addreplysds(c,sdsnew(“+ok already connected to specified master\r\n“));

/* there was no previous master or the user specified a different one,

* we can continue. */

// 設定新的主從同步,這裡隻是設定,然後直接傳回

replicationsetmaster(c->argv[1]->ptr, port);

redislog(redis_notice,“slave of %s:%d enabled (user request)“,

server.masterhost, server.masterport);

addreply(c,shared.ok);

void replicationsetmaster(char *ip, int port) {

sdsfree(server.masterhost);

server.masterhost = sdsdup(ip);

server.masterport = port;

//如果目前slave以前是master,斷開所有原先的連接配接

if (server.master) freeclient(server.master);

disconnectslaves(); /* force our slaves to resync with us as well. */

replicationdiscardcachedmaster(); /* don’t try a psync. */

freereplicationbacklog(); /* don’t allow our chained slaves to psync. */

cancelreplicationhandshake();

server.repl_state = redis_repl_connect;

server.master_repl_offset = 0;

可以看到,slaveof指令是一個異步指令,執行的時候隻是設定了新的主伺服器,然後就立馬傳回結果了。真正執行連接配接等操作的, 是在定時器中執行的。

/* replication cron function — used to reconnect to master and

* to detect transfer failures. */

run_with_period(1000) replicationcron();

提醒哦那個每隔1秒鐘,會調用replicationcron函數,該函數會根據狀态執行定時操作。當狀态為redis_repl_connect的時候 執行邏輯為:

void replicationcron(void) {

/* check if we should connect to a master */

if (server.repl_state == redis_repl_connect) {

redislog(redis_notice,“connecting to master %s:%d“,

if (connectwithmaster() == redis_ok) {

redislog(redis_notice,“master <-> slave sync started“);

int connectwithmaster(void) {

int fd;

fd = anettcpnonblockconnect(null,server.masterhost,server.masterport);

if (fd == –1) {

redislog(redis_warning,“unable to connect to master: %s“,

strerror(errno));

return redis_err;

if (aecreatefileevent(server.el,fd,ae_readable|ae_writable,syncwithmaster,null) ==

ae_err)

{

close(fd);

redislog(redis_warning,“can’t create readable event for sync“);

server.repl_transfer_lastio = server.unixtime;

server.repl_transfer_s = fd;

server.repl_state = redis_repl_connecting;

return redis_ok;

如果發現目前主從狀态是redis_repl_connect(剛執行slaveof的時候設定的),就會去連接配接主伺服器。當socket連接配接建立之後, 會注冊syncwithmaster這個回調,并且設定主從狀态為redis_repl_connecting。

ping指令都很熟悉了,jedis pool中用來檢測目前連接配接是否有效,用的就是這個指令。手工執行ping指令,redis會傳回一個pong作為響應。

這裡發送ping指令,主要也是為了檢測目前和master連接配接是否正常,master是否能夠正常處理指令。

void syncwithmaster(aeeventloop *el, int fd, void *privdata, int mask) {

if (server.repl_state == redis_repl_connecting) {

redislog(redis_notice,“non blocking connect for sync fired the event.“);

/* delete the writable event so that the readable event remains

* registered and we can wait for the pong reply. */

aedeletefileevent(server.el,fd,ae_writable);

server.repl_state = redis_repl_receive_pong;

/* send the ping, don’t check for errors at all, we have the timeout

* that will take care about this. */

syncwrite(fd,“ping\r\n“,6,100);

這裡當狀态是redis_repl_connecting的時候,向master發送了ping指令,然後就等待master傳回pong的響應。

pong響應也是在這個函數中進行處理的:

/* receive the pong command. */

if (server.repl_state == redis_repl_receive_pong) {

char buf[1024];

/* delete the readable event, we no longer need it now that there is

* the ping reply to read. */

aedeletefileevent(server.el,fd,ae_readable);

/* read the reply with explicit timeout. */

buf[0] = ‘\0‘;

if (syncreadline(fd,buf,sizeof(buf),

server.repl_syncio_timeout*1000) == –1)

redislog(redis_warning,

“i/o error reading ping reply from master: %s“,

goto error;

/* we accept only two replies as valid, a positive +pong reply

* (we just check for “+”) or an authentication error.

* note that older versions of redis replied with “operation not

* permitted” instead of using a proper error code, so we test

* both. */

if (buf[0] != ‘+‘ &&

strncmp(buf,“-noauth“,7) != 0 &&

strncmp(buf,“-err operation not permitted“,28) != 0)

redislog(redis_warning,“error reply to ping from master: ‘%s‘“,buf);

redislog(redis_notice,

“master replied to ping, replication can continue…“);

/* auth with the master if required. */

if(server.masterauth) {

err = sendsynchronouscommand(fd,“auth“,server.masterauth,null);

if (err[0] == ‘–‘) {

redislog(redis_warning,“unable to auth to master: %s“,err);

sdsfree(err);

/* set the slave port, so that master’s info command can list the

* slave listening port correctly. */

sds port = sdsfromlonglong(server.port);

err = sendsynchronouscommand(fd,“replconf“,“listening-port“,port,

null);

sdsfree(port);

/* ignore the error if any, not all the redis versions support

* replconf listening-port. */

redislog(redis_notice,“(non critical) master does not understand replconf listening-port: %s“, err);

error:

server.repl_transfer_s = –1;

如果讀取master傳回值失敗,直接跳轉到error,關閉連接配接,重新将連接配接狀态設定為redis_repl_connect(也就是slaveof執行完成之後的狀态), 等待下次定時器重連;

讀取響應成功,判斷響應值是否為pong,如果為pong則表示連接配接檢測完成,将發送目前slave端口資訊,用于master同步資料

如果判斷是需要認證,切設定了masterauth,則發送auth指令,向master發起授權。

如果授權成功,将繼續後續的同步流程

如果授權失敗,則進入error流程,關閉連接配接,并等待下次重試

前面的pong響應流程裡面已經提到了,當正确接收到了pong響應,或者是完成了認證之後,slave會發起一個replconf指令,将自己的端口發送給master。 master接受到這個指令之後,将slave的端口資訊記錄到這個slave對應的client對象的slave_listening_port屬性中。

void replconfcommand(redisclient *c) {

if (!strcasecmp(c->argv[j]->ptr,“listening-port“)) {

if ((getlongfromobjectorreply(c,c->argv[j+1],

&port,null) != redis_ok))

c->slave_listening_port = port;

這時,在master上通過info指令,就可以看見slave的端口資訊:

info replication

還是在syncwithmaster函數中。發送完端口資訊之後,slave會嘗試進行增量同步:

psync_result = slavetrypartialresynchronization(fd);

if (psync_result == psync_continue) {

redislog(redis_notice, “master <-> slave sync: master accepted a partial resynchronization.“);

/* fall back to sync if needed. otherwise psync_result == psync_fullresync

* and the server.repl_master_runid and repl_master_initial_offset are

* already populated. */

if (psync_result == psync_not_supported) {

redislog(redis_notice,“retrying with sync…“);

if (syncwrite(fd,“sync\r\n“,6,server.repl_syncio_timeout*1000) == –1) {

redislog(redis_warning,“i/o error writing to master: %s“,

如果不支援增量同步,會向master發送sync指令做全量同步。增量同步是在redis2.8中支援的,是以全量同步就不管了。大緻的操作流程就是 master做一次bgsave,然後将儲存的rdb檔案通過tcp連接配接發送給slave,slave加載這個rdb檔案。

這裡着重了解增量同步:

#define psync_continue 0

#define psync_fullresync 1

#define psync_not_supported 2

int slavetrypartialresynchronization(int fd) {

char *psync_runid;

char psync_offset[32];

sds reply;

/* initially set repl_master_initial_offset to -1 to mark the current

* master run_id and offset as not valid. later if we’ll be able to do

* a full resync using the psync command we’ll set the offset at the

* right value, so that this information will be propagated to the

* client structure representing the master into server.master. */

server.repl_master_initial_offset = –1;

if (server.cached_master) {

psync_runid = server.cached_master->replrunid;

snprintf(psync_offset,sizeof(psync_offset),“%lld“, server.cached_master->reploff+1);

redislog(redis_notice,“trying a partial resynchronization (request %s:%s).“, psync_runid, psync_offset);

redislog(redis_notice,“partial resynchronization not possible (no cached master)“);

psync_runid = “?“;

memcpy(psync_offset,“-1“,3);

/* issue the psync command */

reply = sendsynchronouscommand(fd,“psync“,psync_runid,psync_offset,null);

if (!strncmp(reply,“+fullresync“,11)) {

char *runid = null, *offset = null;

/* full resync, parse the reply in order to extract the run id

* and the replication offset. */

runid = strchr(reply,‘ ‘);

if (runid) {

runid++;

offset = strchr(runid,‘ ‘);

if (offset) offset++;

if (!runid || !offset || (offset-runid-1) != redis_run_id_size) {

“master replied with wrong +fullresync syntax.“);

/* this is an unexpected condition, actually the +fullresync

* reply means that the master supports psync, but the reply

* format seems wrong. to stay safe we blank the master

* runid to make sure next psyncs will fail. */

memset(server.repl_master_runid,0,redis_run_id_size+1);

memcpy(server.repl_master_runid, runid, offset-runid-1);

server.repl_master_runid[redis_run_id_size] = ‘\0‘;

server.repl_master_initial_offset = strtoll(offset,null,10);

redislog(redis_notice,“full resync from master: %s:%lld“,

server.repl_master_runid,

server.repl_master_initial_offset);

/* we are going to full resync, discard the cached master structure. */

replicationdiscardcachedmaster();

sdsfree(reply);

return psync_fullresync;

if (!strncmp(reply,“+continue“,9)) {

/* partial resync was accepted, set the replication state accordingly */

“successful partial resynchronization with master.“);

replicationresurrectcachedmaster(fd);

return psync_continue;

/* if we reach this point we receied either an error since the master does

* not understand psync, or an unexpected reply from the master.

* return psync_not_supported to the caller in both cases. */

if (strncmp(reply,“-err“,4)) {

/* if it’s not an error, log the unexpected event. */

“unexpected reply to psync from master: %s“, reply);

“master does not support psync or is in “

“error state (reply: %s)“, reply);

return psync_not_supported;

首先設定同步偏移量為-1,表示第一次增量更新(其實也就是個全量更新)

向master發送psync指令,告知master自己的id和同步偏移量

master傳回全量更新(fullresync),儲存master傳回的偏移量和運作id,清除之前緩存的master資訊 确認可以增量同步後,由于第一次是全量同步,是以操作和原全量同步相同:

/* prepare a suitable temp file for bulk transfer */

while(maxtries–) {

snprintf(tmpfile,256,

“temp-%d.%ld.rdb“,(int)server.unixtime,(long int)getpid());

dfd = open(tmpfile,o_creat|o_wronly|o_excl,0644);

if (dfd != –1) break;

sleep(1);

if (dfd == –1) {

redislog(redis_warning,“opening the temp file needed for master <-> slave synchronization: %s“,strerror(errno));

/* setup the non blocking download of the bulk file. */

if (aecreatefileevent(server.el,fd, ae_readable,readsyncbulkpayload,null)

== ae_err)

“can’t create readable event for sync: %s (fd=%d)“,

strerror(errno),fd);

server.repl_state = redis_repl_transfer;

server.repl_transfer_size = –1;

server.repl_transfer_read = 0;

server.repl_transfer_last_fsync_off = 0;

server.repl_transfer_fd = dfd;

server.repl_transfer_tmpfile = zstrdup(tmpfile);

建立一個臨時檔案,用于儲存master傳回的rdb檔案

開始讀取master傳輸回來的rdb檔案,注冊readsyncbulkpayload回調函數來處理

設定目前的狀态為redis_repl_transfer,并儲存傳輸檔案等中間内容

readsyncbulkpayload函數用于接收master傳輸的rdb檔案,并加載到redis中,大緻流程:

讀取檔案長度

讀取檔案内容,并儲存到本地rdb臨時檔案中

讀取完成之後,清空redis資料庫

加載rdb檔案

建立一個master -> slave的通道,将目前slave作為master的client,以繼續執行master同步過來的指令

将同步狀态改成redis_repl_connected,并回寫同步偏移量等

開啟aof如果需要(server.aof_state != redis_aof_off)

void synccommand(redisclient *c) {

/* ignore sync if already slave or in monitor mode */

if (c->flags & redis_slave) return;

/* refuse sync requests if we are a slave but the link with our master

* is not ok… */

if (server.masterhost && server.repl_state != redis_repl_connected) {

addreplyerror(c,“can’t sync while not connected with my master“);

/* sync can’t be issued when the server has pending data to send to

* the client about already issued commands. we need a fresh reply

* buffer registering the differences between the bgsave and the current

* dataset, so that we can copy to other slaves if needed. */

if (listlength(c->reply) != 0 || c->bufpos != 0) {

addreplyerror(c,“sync and psync are invalid with pending output“);

redislog(redis_notice,“slave asks for synchronization“);

/* try a partial resynchronization if this is a psync command.

* if it fails, we continue with usual full resynchronization, however

* when this happens mastertrypartialresynchronization() already

* replied with:

*

* +fullresync <runid> <offset>

* so the slave knows the new runid and offset to try a psync later

* if the connection with the master is lost. */

if (!strcasecmp(c->argv[0]->ptr,“psync“)) {

if (mastertrypartialresynchronization(c) == redis_ok) {

server.stat_sync_partial_ok++;

return; /* no full resync needed, return. */

char *master_runid = c->argv[1]->ptr;

/* increment stats for failed psyncs, but only if the

* runid is not “?”, as this is used by slaves to force a full

* resync on purpose when they are not albe to partially

* resync. */

if (master_runid[0] != ‘?‘) server.stat_sync_partial_err++;

/* if a slave uses sync, we are dealing with an old implementation

* of the replication protocol (like redis-cli –slave). flag the client

* so that we don’t expect to receive replconf ack feedbacks. */

c->flags |= redis_pre_psync;

/* full resynchronization. */

server.stat_sync_full++;

/* here we need to check if there is a background saving operation

* in progress, or if it is required to start one */

if (server.rdb_child_pid != –1) {

/* ok a background save is in progress. let’s check if it is a good

* one for replication, i.e. if there is another slave that is

* registering differences since the server forked to save */

redisclient *slave;

listnode *ln;

listiter li;

listrewind(server.slaves,&li);

while((ln = listnext(&li))) {

slave = ln->value;

if (slave->replstate == redis_repl_wait_bgsave_end) break;

if (ln) {

/* perfect, the server is already registering differences for

* another slave. set the right state, and copy the buffer. */

copyclientoutputbuffer(c,slave);

c->replstate = redis_repl_wait_bgsave_end;

redislog(redis_notice,“waiting for end of bgsave for sync“);

/* no way, we need to wait for the next bgsave in order to

* register differences */

c->replstate = redis_repl_wait_bgsave_start;

redislog(redis_notice,“waiting for next bgsave for sync“);

/* ok we don’t have a bgsave in progress, let’s start one */

redislog(redis_notice,“starting bgsave for sync“);

if (rdbsavebackground(server.rdb_filename) != redis_ok) {

redislog(redis_notice,“replication failed, can’t bgsave“);

addreplyerror(c,“unable to perform background save“);

/* flush the script cache for the new slave. */

replicationscriptcacheflush();

if (server.repl_disable_tcp_nodelay)

anetdisabletcpnodelay(null, c->fd); /* non critical if it fails. */

c->repldbfd = –1;

c->flags |= redis_slave;

server.slaveseldb = –1; /* force to re-emit the select command. */

listaddnodetail(server.slaves,c);

if (listlength(server.slaves) == 1 && server.repl_backlog == null)

createreplicationbacklog();

首先判斷自己是slave的時候不能執行psync

判斷是否需要全量同步,如果不需要,直接退出

如果需要全量同步,建立一個rdb檔案

如果已經在寫rdb檔案,盡量複用目前的檔案

如果沒有,則發起一個bgsave

判斷是否需要全量同步:

int mastertrypartialresynchronization(redisclient *c) {

long long psync_offset, psync_len;

char buf[128];

int buflen;

/* is the runid of this master the same advertised by the wannabe slave

* via psync? if runid changed this master is a different instance and

* there is no way to continue. */

if (strcasecmp(master_runid, server.runid)) {

/* run id “?” is used by slaves that want to force a full resync. */

if (master_runid[0] != ‘?‘) {

redislog(redis_notice,“partial resynchronization not accepted: “

“runid mismatch (client asked for ‘%s‘, i’m ‘%s‘)“,

master_runid, server.runid);

redislog(redis_notice,“full resync requested by slave.“);

goto need_full_resync;

/* we still have the data our slave is asking for? */

if (getlonglongfromobjectorreply(c,c->argv[2],&psync_offset,null) !=

redis_ok) goto need_full_resync;

if (!server.repl_backlog ||

psync_offset < server.repl_backlog_off ||

psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))

“unable to partial resync with the slave for lack of backlog (slave request was: %lld).“, psync_offset);

if (psync_offset > server.master_repl_offset) {

“warning: slave tried to psync with an offset that is greater than the master replication offset.“);

/* if we reached this point, we are able to perform a partial resync:

* 1) set client state to make it a slave.

* 2) inform the client we can continue with +continue

* 3) send the backlog data (from the offset to the end) to the slave. */

c->replstate = redis_repl_online;

c->repl_ack_time = server.unixtime;

/* we can’t use the connection buffers since they are used to accumulate

* new commands at this stage. but we are sure the socket send buffer is

* emtpy so this write will never fail actually. */

buflen = snprintf(buf,sizeof(buf),“+continue\r\n“);

if (write(c->fd,buf,buflen) != buflen) {

freeclientasync(c);

psync_len = addreplyreplicationbacklog(c,psync_offset);

“partial resynchronization request accepted. sending %lld bytes of backlog starting from offset %lld.“, psync_len, psync_offset);

/* note that we don’t need to set the selected db at server.slaveseldb

* to -1 to force the master to emit select, since the slave already

* has this state from the previous connection with the master. */

refreshgoodslavescount();

return redis_ok; /* the caller can return, no full resync needed. */

need_full_resync:

/* we need a full resync for some reason… notify the client. */

psync_offset = server.master_repl_offset;

/* add 1 to psync_offset if it the replication backlog does not exists

* as when it will be created later we’ll increment the offset by one. */

if (server.repl_backlog == null) psync_offset++;

/* again, we can’t use the connection buffers (see above). */

buflen = snprintf(buf,sizeof(buf),“+fullresync %s %lld\r\n“,

server.runid,psync_offset);

主要場景有兩個:

目前請求的id和server的id不比對

目前redis儲存的日志無法滿足slave要求的偏移量

master還沒有back log

master back log長度不夠

同時,每次rdb檔案儲存完畢的時候,都會調用updateslaveswaitingbgsave函數,處理儲存的rdb檔案。

void updateslaveswaitingbgsave(int bgsaveerr) {

int startbgsave = 0;

redisclient *slave = ln->value;

if (slave->replstate == redis_repl_wait_bgsave_start) {

startbgsave = 1;

slave->replstate = redis_repl_wait_bgsave_end;

} else if (slave->replstate == redis_repl_wait_bgsave_end) {

// rdb檔案寫入完畢

struct redis_stat buf;

if (bgsaveerr != redis_ok) {

freeclient(slave);

redislog(redis_warning,“sync failed. bgsave child returned an error“);

continue;

// 打開剛寫入的rdb檔案

if ((slave->repldbfd = open(server.rdb_filename,o_rdonly)) == –1 ||

redis_fstat(slave->repldbfd,&buf) == –1) {

redislog(redis_warning,“sync failed. can’t open/stat db after bgsave: %s“, strerror(errno));

slave->repldboff = 0;

slave->repldbsize = buf.st_size;

slave->replstate = redis_repl_send_bulk;

aedeletefileevent(server.el,slave->fd,ae_writable);

// 開始發送rdb檔案

if (aecreatefileevent(server.el, slave->fd, ae_writable, sendbulktoslave, slave) == ae_err) {

if (startbgsave) {

/* since we are starting a new background save for one or more slaves,

* we flush the replication script cache to use eval to propagate every

* new evalsha for the first time, since all the new slaves don’t know

* about previous scripts. */

redislog(redis_warning,“sync failed. bgsave failed“);

if (slave->replstate == redis_repl_wait_bgsave_start)

上面的流程結束,slave已經包含了master bgsave時所包含的所有資料。後續就需要master一直将自己的指令發送給slave。

void call(redisclient *c, int flags) {

/* propagate the command into the aof and replication link */

if (flags & redis_call_propagate) {

int flags = redis_propagate_none;

if (c->flags & redis_force_repl) flags |= redis_propagate_repl;

if (c->flags & redis_force_aof) flags |= redis_propagate_aof;

if (dirty)

flags |= (redis_propagate_repl | redis_propagate_aof);

if (flags != redis_propagate_none)

propagate(c->cmd,c->db->id,c->argv,c->argc,flags);

void propagate(struct rediscommand *cmd, int dbid, robj **argv, int argc,

int flags)

if (server.aof_state != redis_aof_off && flags & redis_propagate_aof)

feedappendonlyfile(cmd,dbid,argv,argc);

if (flags & redis_propagate_repl)

replicationfeedslaves(server.slaves,dbid,argv,argc);

在調用任何指令的時候,都會将指令分發到slave上去(除了aof加載或者指令加了redis_cmd_skip_monitor标簽)。

replicationfeedslaves函數主要作用有兩個:

将指令發送給所有線上的slave

将指令寫入到back log中,友善後續增量同步

void replicationfeedslaves(list *slaves, int dictid, robj **argv, int argc) {

int j, len;

char llstr[redis_longstr_size];

/* if there aren’t slaves, and there is no backlog buffer to populate,

* we can return asap. */

if (server.repl_backlog == null && listlength(slaves) == 0) return;

/* we can’t have slaves attached and no backlog. */

redisassert(!(listlength(slaves) != 0 && server.repl_backlog == null));

/* send select command to every slave if needed. */

if (server.slaveseldb != dictid) {

robj *selectcmd;

/* for a few dbs we have pre-computed select command. */

// 每次都增加一個select指令,防止弄錯db

if (dictid >= 0 && dictid < redis_shared_select_cmds) {

selectcmd = shared.select[dictid];

int dictid_len;

dictid_len = ll2string(llstr,sizeof(llstr),dictid);

selectcmd = createobject(redis_string,

sdscatprintf(sdsempty(),

“*2\r\n$6\r\nselect\r\n$%d\r\n%s\r\n“,

dictid_len, llstr));

/* add the select command into the backlog. */

// 将select指令寫入到backlog中

if (server.repl_backlog) feedreplicationbacklogwithobject(selectcmd);

/* send it to slaves. */

// 将select指令發送給slave

listrewind(slaves,&li);

addreply(slave,selectcmd);

if (dictid < 0 || dictid >= redis_shared_select_cmds)

decrrefcount(selectcmd);

server.slaveseldb = dictid;

/* write the command to the replication backlog if any. */

// 将指令寫入到backlog中

if (server.repl_backlog) {

char aux[redis_longstr_size+3];

/* add the multi bulk reply length. */

aux[0] = ‘*‘;

len = ll2string(aux+1,sizeof(aux)-1,argc);

aux[len+1] = ‘\r‘;

aux[len+2] = ‘\n‘;

feedreplicationbacklog(aux,len+3);

for (j = 0; j < argc; j++) {

long objlen = stringobjectlen(argv[j]);

/* we need to feed the buffer with the object as a bulk reply

* not just as a plain string, so create the $..crlf payload len

* ad add the final crlf */

aux[0] = ‘$‘;

len = ll2string(aux+1,sizeof(aux)-1,objlen);

feedreplicationbacklogwithobject(argv[j]);

feedreplicationbacklog(aux+len+1,2);

/* write the command to every slave. */

// 将指令發送到所有的slave

/* don’t feed slaves that are still waiting for bgsave to start */

if (slave->replstate == redis_repl_wait_bgsave_start) continue;

/* feed slaves that are waiting for the initial sync (so these commands

* are queued in the output buffer until the initial sync completes),

* or are already in sync with the master. */

/* add the multi bulk length. */

addreplymultibulklen(slave,argc);

/* finally any additional argument that was not stored inside the

* static buffer if any (from j to argc). */

for (j = 0; j < argc; j++)

addreplybulk(slave,argv[j]);

注:backlog大小可以設定,預設的大小為1m,如果超過,覆寫最初的日志

#define redis_default_repl_backlog_size (1024*1024) /* 1mb */

在指令傳播階段,slave每秒一次向master發送replconf指令,發送目前的offset,讓master檢測是否有指令丢失。 這個也是在定時器中發送的。

if (server.masterhost && server.master &&

!(server.master->flags & redis_pre_psync))

replicationsendack();

void replicationsendack(void) {

redisclient *c = server.master;

if (c != null) {

c->flags |= redis_master_force_reply;

addreplymultibulklen(c,3);

addreplybulkcstring(c,“replconf“);

addreplybulkcstring(c,“ack“);

addreplybulklonglong(c,c->reploff);

c->flags &= ~redis_master_force_reply;

同時,master在接收到這個ack包的時候,會記錄slave的ack offset和ack時間:

else if (!strcasecmp(c->argv[j]->ptr,“ack“)) {

/* replconf ack is used by slave to inform the master the amount

* of replication stream that it processed so far. it is an

* internal only command that normal clients should never use. */

long long offset;

if (!(c->flags & redis_slave)) return;

if ((getlonglongfromobject(c->argv[j+1], &offset) != redis_ok))

if (offset > c->repl_ack_off)

c->repl_ack_off = offset;

/* note: this command does not reply anything! */

還是在定時器中,每次調用的時候都會清理已經逾時的slave:

/* disconnect timedout slaves. */

if (listlength(server.slaves)) {

if (slave->replstate != redis_repl_online) continue;

if (slave->flags & redis_pre_psync) continue;

if ((server.unixtime – slave->repl_ack_time) > server.repl_timeout)

char ip[redis_ip_str_len];

int port;

if (anetpeertostring(slave->fd,ip,sizeof(ip),&port) != –1) {

“disconnecting timedout slave: %s:%d“,

ip, slave->slave_listening_port);

這裡的repl_ack_time由slave每次發送的ack包寫入,server.repl_timeout預設值是60s:

#define redis_repl_timeout 60

master斷開了slave連接配接之後,slave為了能夠進行增量同步,freeclient的實作,針對master的slave client,也有不同的處理:

void freeclient(redisclient *c) {

/* if it is our master that’s beging disconnected we should make sure

* to cache the state to try a partial resynchronization later.

* note that before doing this we make sure that the client is not in

* some unexpected state, by checking its flags. */

if (server.master && c->flags & redis_master) {

redislog(redis_warning,“connection with master lost.“);

if (!(c->flags & (redis_close_after_reply|

redis_close_asap|

redis_blocked|

redis_unblocked)))

replicationcachemaster(c);

void replicationcachemaster(redisclient *c) {

redisassert(server.master != null && server.cached_master == null);

redislog(redis_notice,“caching the disconnected master state.“);

/* remove from the list of clients, we don’t want this client to be

* listed by client list or processed in any way by batch operations. */

// 首先将slave從client清單中删除

ln = listsearchkey(server.clients,c);

redisassert(ln != null);

listdelnode(server.clients,ln);

/* save the master. server.master will be set to null later by

* replicationhandlemasterdisconnection(). */

// 把slave的master儲存到cached_master中

server.cached_master = server.master;

/* remove the event handlers and close the socket. we’ll later reuse

* the socket of the new connection with the master during psync. */

// 清理slave連接配接,釋放資源

aedeletefileevent(server.el,c->fd,ae_readable);

aedeletefileevent(server.el,c->fd,ae_writable);

close(c->fd);

/* set fd to -1 so that we can safely call freeclient(c) later. */

c->fd = –1;

/* invalidate the peer id cache. */

if (c->peerid) {

sdsfree(c->peerid);

c->peerid = null;

·

/* caching the master happens instead of the actual freeclient() call,

* so make sure to adjust the replication state. this function will

* also set server.master to null. */

replicationhandlemasterdisconnection();

void replicationhandlemasterdisconnection(void) {

server.master = null;

server.repl_down_since = server.unixtime;

/* we lost connection with our master, force our slaves to resync

* with us as well to load the new data set.

* if server.masterhost is null the user called slaveof no one so

* slave resync is not needed. */

if (server.masterhost != null) disconnectslaves();

經過這些處理,一個斷開連接配接的slave,複制狀态變成了redis_repl_connect。按照之前的流程,定時器會去嘗試連接配接master, 發送ping指令,然後再發送psync指令的時候,由于已經有了cached_master,會在psync指令中帶上之前master的id和偏移量。 相關slave和master的處理邏輯,前面代碼中已經有了。

轉載自:https://coolex.info/blog/463.html