slaveof host port
redis的主從複制設定非常友善,隻需要在從伺服器上設定主伺服器的ip和端口即可。如果需要關閉主從同步,隻需要執行slaveof no one即可。 該指令的具體描述見官方文檔
void slaveofcommand(redisclient *c) {
// 先處理no one,解除現有的主從同步
if (!strcasecmp(c->argv[1]->ptr,“no“) &&
!strcasecmp(c->argv[2]->ptr,“one“)) {
if (server.masterhost) {
replicationunsetmaster();
redislog(redis_notice,“master mode enabled (user request)“);
}
} else {
long port;
if ((getlongfromobjectorreply(c, c->argv[2], &port, null) != redis_ok))
return;
/* check if we are already attached to the specified slave */
if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
&& server.masterport == port) {
redislog(redis_notice,“slave of would result into synchronization with the master we are already connected with. no operation performed.“);
addreplysds(c,sdsnew(“+ok already connected to specified master\r\n“));
/* there was no previous master or the user specified a different one,
* we can continue. */
// 設定新的主從同步,這裡隻是設定,然後直接傳回
replicationsetmaster(c->argv[1]->ptr, port);
redislog(redis_notice,“slave of %s:%d enabled (user request)“,
server.masterhost, server.masterport);
addreply(c,shared.ok);
void replicationsetmaster(char *ip, int port) {
sdsfree(server.masterhost);
server.masterhost = sdsdup(ip);
server.masterport = port;
//如果目前slave以前是master,斷開所有原先的連接配接
if (server.master) freeclient(server.master);
disconnectslaves(); /* force our slaves to resync with us as well. */
replicationdiscardcachedmaster(); /* don’t try a psync. */
freereplicationbacklog(); /* don’t allow our chained slaves to psync. */
cancelreplicationhandshake();
server.repl_state = redis_repl_connect;
server.master_repl_offset = 0;
可以看到,slaveof指令是一個異步指令,執行的時候隻是設定了新的主伺服器,然後就立馬傳回結果了。真正執行連接配接等操作的, 是在定時器中執行的。
/* replication cron function — used to reconnect to master and
* to detect transfer failures. */
run_with_period(1000) replicationcron();
提醒哦那個每隔1秒鐘,會調用replicationcron函數,該函數會根據狀态執行定時操作。當狀态為redis_repl_connect的時候 執行邏輯為:
void replicationcron(void) {
…
/* check if we should connect to a master */
if (server.repl_state == redis_repl_connect) {
redislog(redis_notice,“connecting to master %s:%d“,
if (connectwithmaster() == redis_ok) {
redislog(redis_notice,“master <-> slave sync started“);
int connectwithmaster(void) {
int fd;
fd = anettcpnonblockconnect(null,server.masterhost,server.masterport);
if (fd == –1) {
redislog(redis_warning,“unable to connect to master: %s“,
strerror(errno));
return redis_err;
if (aecreatefileevent(server.el,fd,ae_readable|ae_writable,syncwithmaster,null) ==
ae_err)
{
close(fd);
redislog(redis_warning,“can’t create readable event for sync“);
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_s = fd;
server.repl_state = redis_repl_connecting;
return redis_ok;
如果發現目前主從狀态是redis_repl_connect(剛執行slaveof的時候設定的),就會去連接配接主伺服器。當socket連接配接建立之後, 會注冊syncwithmaster這個回調,并且設定主從狀态為redis_repl_connecting。
ping指令都很熟悉了,jedis pool中用來檢測目前連接配接是否有效,用的就是這個指令。手工執行ping指令,redis會傳回一個pong作為響應。
這裡發送ping指令,主要也是為了檢測目前和master連接配接是否正常,master是否能夠正常處理指令。
void syncwithmaster(aeeventloop *el, int fd, void *privdata, int mask) {
if (server.repl_state == redis_repl_connecting) {
redislog(redis_notice,“non blocking connect for sync fired the event.“);
/* delete the writable event so that the readable event remains
* registered and we can wait for the pong reply. */
aedeletefileevent(server.el,fd,ae_writable);
server.repl_state = redis_repl_receive_pong;
/* send the ping, don’t check for errors at all, we have the timeout
* that will take care about this. */
syncwrite(fd,“ping\r\n“,6,100);
這裡當狀态是redis_repl_connecting的時候,向master發送了ping指令,然後就等待master傳回pong的響應。
pong響應也是在這個函數中進行處理的:
/* receive the pong command. */
if (server.repl_state == redis_repl_receive_pong) {
char buf[1024];
/* delete the readable event, we no longer need it now that there is
* the ping reply to read. */
aedeletefileevent(server.el,fd,ae_readable);
/* read the reply with explicit timeout. */
buf[0] = ‘\0‘;
if (syncreadline(fd,buf,sizeof(buf),
server.repl_syncio_timeout*1000) == –1)
redislog(redis_warning,
“i/o error reading ping reply from master: %s“,
goto error;
/* we accept only two replies as valid, a positive +pong reply
* (we just check for “+”) or an authentication error.
* note that older versions of redis replied with “operation not
* permitted” instead of using a proper error code, so we test
* both. */
if (buf[0] != ‘+‘ &&
strncmp(buf,“-noauth“,7) != 0 &&
strncmp(buf,“-err operation not permitted“,28) != 0)
redislog(redis_warning,“error reply to ping from master: ‘%s‘“,buf);
redislog(redis_notice,
“master replied to ping, replication can continue…“);
/* auth with the master if required. */
if(server.masterauth) {
err = sendsynchronouscommand(fd,“auth“,server.masterauth,null);
if (err[0] == ‘–‘) {
redislog(redis_warning,“unable to auth to master: %s“,err);
sdsfree(err);
/* set the slave port, so that master’s info command can list the
* slave listening port correctly. */
sds port = sdsfromlonglong(server.port);
err = sendsynchronouscommand(fd,“replconf“,“listening-port“,port,
null);
sdsfree(port);
/* ignore the error if any, not all the redis versions support
* replconf listening-port. */
redislog(redis_notice,“(non critical) master does not understand replconf listening-port: %s“, err);
error:
server.repl_transfer_s = –1;
如果讀取master傳回值失敗,直接跳轉到error,關閉連接配接,重新将連接配接狀态設定為redis_repl_connect(也就是slaveof執行完成之後的狀态), 等待下次定時器重連;
讀取響應成功,判斷響應值是否為pong,如果為pong則表示連接配接檢測完成,将發送目前slave端口資訊,用于master同步資料
如果判斷是需要認證,切設定了masterauth,則發送auth指令,向master發起授權。
如果授權成功,将繼續後續的同步流程
如果授權失敗,則進入error流程,關閉連接配接,并等待下次重試
前面的pong響應流程裡面已經提到了,當正确接收到了pong響應,或者是完成了認證之後,slave會發起一個replconf指令,将自己的端口發送給master。 master接受到這個指令之後,将slave的端口資訊記錄到這個slave對應的client對象的slave_listening_port屬性中。
void replconfcommand(redisclient *c) {
if (!strcasecmp(c->argv[j]->ptr,“listening-port“)) {
if ((getlongfromobjectorreply(c,c->argv[j+1],
&port,null) != redis_ok))
c->slave_listening_port = port;
這時,在master上通過info指令,就可以看見slave的端口資訊:
info replication
還是在syncwithmaster函數中。發送完端口資訊之後,slave會嘗試進行增量同步:
psync_result = slavetrypartialresynchronization(fd);
if (psync_result == psync_continue) {
redislog(redis_notice, “master <-> slave sync: master accepted a partial resynchronization.“);
/* fall back to sync if needed. otherwise psync_result == psync_fullresync
* and the server.repl_master_runid and repl_master_initial_offset are
* already populated. */
if (psync_result == psync_not_supported) {
redislog(redis_notice,“retrying with sync…“);
if (syncwrite(fd,“sync\r\n“,6,server.repl_syncio_timeout*1000) == –1) {
redislog(redis_warning,“i/o error writing to master: %s“,
如果不支援增量同步,會向master發送sync指令做全量同步。增量同步是在redis2.8中支援的,是以全量同步就不管了。大緻的操作流程就是 master做一次bgsave,然後将儲存的rdb檔案通過tcp連接配接發送給slave,slave加載這個rdb檔案。
這裡着重了解增量同步:
#define psync_continue 0
#define psync_fullresync 1
#define psync_not_supported 2
int slavetrypartialresynchronization(int fd) {
char *psync_runid;
char psync_offset[32];
sds reply;
/* initially set repl_master_initial_offset to -1 to mark the current
* master run_id and offset as not valid. later if we’ll be able to do
* a full resync using the psync command we’ll set the offset at the
* right value, so that this information will be propagated to the
* client structure representing the master into server.master. */
server.repl_master_initial_offset = –1;
if (server.cached_master) {
psync_runid = server.cached_master->replrunid;
snprintf(psync_offset,sizeof(psync_offset),“%lld“, server.cached_master->reploff+1);
redislog(redis_notice,“trying a partial resynchronization (request %s:%s).“, psync_runid, psync_offset);
redislog(redis_notice,“partial resynchronization not possible (no cached master)“);
psync_runid = “?“;
memcpy(psync_offset,“-1“,3);
/* issue the psync command */
reply = sendsynchronouscommand(fd,“psync“,psync_runid,psync_offset,null);
if (!strncmp(reply,“+fullresync“,11)) {
char *runid = null, *offset = null;
/* full resync, parse the reply in order to extract the run id
* and the replication offset. */
runid = strchr(reply,‘ ‘);
if (runid) {
runid++;
offset = strchr(runid,‘ ‘);
if (offset) offset++;
if (!runid || !offset || (offset-runid-1) != redis_run_id_size) {
“master replied with wrong +fullresync syntax.“);
/* this is an unexpected condition, actually the +fullresync
* reply means that the master supports psync, but the reply
* format seems wrong. to stay safe we blank the master
* runid to make sure next psyncs will fail. */
memset(server.repl_master_runid,0,redis_run_id_size+1);
memcpy(server.repl_master_runid, runid, offset-runid-1);
server.repl_master_runid[redis_run_id_size] = ‘\0‘;
server.repl_master_initial_offset = strtoll(offset,null,10);
redislog(redis_notice,“full resync from master: %s:%lld“,
server.repl_master_runid,
server.repl_master_initial_offset);
/* we are going to full resync, discard the cached master structure. */
replicationdiscardcachedmaster();
sdsfree(reply);
return psync_fullresync;
if (!strncmp(reply,“+continue“,9)) {
/* partial resync was accepted, set the replication state accordingly */
“successful partial resynchronization with master.“);
replicationresurrectcachedmaster(fd);
return psync_continue;
/* if we reach this point we receied either an error since the master does
* not understand psync, or an unexpected reply from the master.
* return psync_not_supported to the caller in both cases. */
if (strncmp(reply,“-err“,4)) {
/* if it’s not an error, log the unexpected event. */
“unexpected reply to psync from master: %s“, reply);
“master does not support psync or is in “
“error state (reply: %s)“, reply);
return psync_not_supported;
首先設定同步偏移量為-1,表示第一次增量更新(其實也就是個全量更新)
向master發送psync指令,告知master自己的id和同步偏移量
master傳回全量更新(fullresync),儲存master傳回的偏移量和運作id,清除之前緩存的master資訊 确認可以增量同步後,由于第一次是全量同步,是以操作和原全量同步相同:
/* prepare a suitable temp file for bulk transfer */
while(maxtries–) {
snprintf(tmpfile,256,
“temp-%d.%ld.rdb“,(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,o_creat|o_wronly|o_excl,0644);
if (dfd != –1) break;
sleep(1);
if (dfd == –1) {
redislog(redis_warning,“opening the temp file needed for master <-> slave synchronization: %s“,strerror(errno));
/* setup the non blocking download of the bulk file. */
if (aecreatefileevent(server.el,fd, ae_readable,readsyncbulkpayload,null)
== ae_err)
“can’t create readable event for sync: %s (fd=%d)“,
strerror(errno),fd);
server.repl_state = redis_repl_transfer;
server.repl_transfer_size = –1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_fd = dfd;
server.repl_transfer_tmpfile = zstrdup(tmpfile);
建立一個臨時檔案,用于儲存master傳回的rdb檔案
開始讀取master傳輸回來的rdb檔案,注冊readsyncbulkpayload回調函數來處理
設定目前的狀态為redis_repl_transfer,并儲存傳輸檔案等中間内容
readsyncbulkpayload函數用于接收master傳輸的rdb檔案,并加載到redis中,大緻流程:
讀取檔案長度
讀取檔案内容,并儲存到本地rdb臨時檔案中
讀取完成之後,清空redis資料庫
加載rdb檔案
建立一個master -> slave的通道,将目前slave作為master的client,以繼續執行master同步過來的指令
将同步狀态改成redis_repl_connected,并回寫同步偏移量等
開啟aof如果需要(server.aof_state != redis_aof_off)
void synccommand(redisclient *c) {
/* ignore sync if already slave or in monitor mode */
if (c->flags & redis_slave) return;
/* refuse sync requests if we are a slave but the link with our master
* is not ok… */
if (server.masterhost && server.repl_state != redis_repl_connected) {
addreplyerror(c,“can’t sync while not connected with my master“);
/* sync can’t be issued when the server has pending data to send to
* the client about already issued commands. we need a fresh reply
* buffer registering the differences between the bgsave and the current
* dataset, so that we can copy to other slaves if needed. */
if (listlength(c->reply) != 0 || c->bufpos != 0) {
addreplyerror(c,“sync and psync are invalid with pending output“);
redislog(redis_notice,“slave asks for synchronization“);
/* try a partial resynchronization if this is a psync command.
* if it fails, we continue with usual full resynchronization, however
* when this happens mastertrypartialresynchronization() already
* replied with:
*
* +fullresync <runid> <offset>
* so the slave knows the new runid and offset to try a psync later
* if the connection with the master is lost. */
if (!strcasecmp(c->argv[0]->ptr,“psync“)) {
if (mastertrypartialresynchronization(c) == redis_ok) {
server.stat_sync_partial_ok++;
return; /* no full resync needed, return. */
char *master_runid = c->argv[1]->ptr;
/* increment stats for failed psyncs, but only if the
* runid is not “?”, as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
if (master_runid[0] != ‘?‘) server.stat_sync_partial_err++;
/* if a slave uses sync, we are dealing with an old implementation
* of the replication protocol (like redis-cli –slave). flag the client
* so that we don’t expect to receive replconf ack feedbacks. */
c->flags |= redis_pre_psync;
/* full resynchronization. */
server.stat_sync_full++;
/* here we need to check if there is a background saving operation
* in progress, or if it is required to start one */
if (server.rdb_child_pid != –1) {
/* ok a background save is in progress. let’s check if it is a good
* one for replication, i.e. if there is another slave that is
* registering differences since the server forked to save */
redisclient *slave;
listnode *ln;
listiter li;
listrewind(server.slaves,&li);
while((ln = listnext(&li))) {
slave = ln->value;
if (slave->replstate == redis_repl_wait_bgsave_end) break;
if (ln) {
/* perfect, the server is already registering differences for
* another slave. set the right state, and copy the buffer. */
copyclientoutputbuffer(c,slave);
c->replstate = redis_repl_wait_bgsave_end;
redislog(redis_notice,“waiting for end of bgsave for sync“);
/* no way, we need to wait for the next bgsave in order to
* register differences */
c->replstate = redis_repl_wait_bgsave_start;
redislog(redis_notice,“waiting for next bgsave for sync“);
/* ok we don’t have a bgsave in progress, let’s start one */
redislog(redis_notice,“starting bgsave for sync“);
if (rdbsavebackground(server.rdb_filename) != redis_ok) {
redislog(redis_notice,“replication failed, can’t bgsave“);
addreplyerror(c,“unable to perform background save“);
/* flush the script cache for the new slave. */
replicationscriptcacheflush();
if (server.repl_disable_tcp_nodelay)
anetdisabletcpnodelay(null, c->fd); /* non critical if it fails. */
c->repldbfd = –1;
c->flags |= redis_slave;
server.slaveseldb = –1; /* force to re-emit the select command. */
listaddnodetail(server.slaves,c);
if (listlength(server.slaves) == 1 && server.repl_backlog == null)
createreplicationbacklog();
首先判斷自己是slave的時候不能執行psync
判斷是否需要全量同步,如果不需要,直接退出
如果需要全量同步,建立一個rdb檔案
如果已經在寫rdb檔案,盡量複用目前的檔案
如果沒有,則發起一個bgsave
判斷是否需要全量同步:
int mastertrypartialresynchronization(redisclient *c) {
long long psync_offset, psync_len;
char buf[128];
int buflen;
/* is the runid of this master the same advertised by the wannabe slave
* via psync? if runid changed this master is a different instance and
* there is no way to continue. */
if (strcasecmp(master_runid, server.runid)) {
/* run id “?” is used by slaves that want to force a full resync. */
if (master_runid[0] != ‘?‘) {
redislog(redis_notice,“partial resynchronization not accepted: “
“runid mismatch (client asked for ‘%s‘, i’m ‘%s‘)“,
master_runid, server.runid);
redislog(redis_notice,“full resync requested by slave.“);
goto need_full_resync;
/* we still have the data our slave is asking for? */
if (getlonglongfromobjectorreply(c,c->argv[2],&psync_offset,null) !=
redis_ok) goto need_full_resync;
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
“unable to partial resync with the slave for lack of backlog (slave request was: %lld).“, psync_offset);
if (psync_offset > server.master_repl_offset) {
“warning: slave tried to psync with an offset that is greater than the master replication offset.“);
/* if we reached this point, we are able to perform a partial resync:
* 1) set client state to make it a slave.
* 2) inform the client we can continue with +continue
* 3) send the backlog data (from the offset to the end) to the slave. */
c->replstate = redis_repl_online;
c->repl_ack_time = server.unixtime;
/* we can’t use the connection buffers since they are used to accumulate
* new commands at this stage. but we are sure the socket send buffer is
* emtpy so this write will never fail actually. */
buflen = snprintf(buf,sizeof(buf),“+continue\r\n“);
if (write(c->fd,buf,buflen) != buflen) {
freeclientasync(c);
psync_len = addreplyreplicationbacklog(c,psync_offset);
“partial resynchronization request accepted. sending %lld bytes of backlog starting from offset %lld.“, psync_len, psync_offset);
/* note that we don’t need to set the selected db at server.slaveseldb
* to -1 to force the master to emit select, since the slave already
* has this state from the previous connection with the master. */
refreshgoodslavescount();
return redis_ok; /* the caller can return, no full resync needed. */
need_full_resync:
/* we need a full resync for some reason… notify the client. */
psync_offset = server.master_repl_offset;
/* add 1 to psync_offset if it the replication backlog does not exists
* as when it will be created later we’ll increment the offset by one. */
if (server.repl_backlog == null) psync_offset++;
/* again, we can’t use the connection buffers (see above). */
buflen = snprintf(buf,sizeof(buf),“+fullresync %s %lld\r\n“,
server.runid,psync_offset);
主要場景有兩個:
目前請求的id和server的id不比對
目前redis儲存的日志無法滿足slave要求的偏移量
master還沒有back log
master back log長度不夠
同時,每次rdb檔案儲存完畢的時候,都會調用updateslaveswaitingbgsave函數,處理儲存的rdb檔案。
void updateslaveswaitingbgsave(int bgsaveerr) {
int startbgsave = 0;
redisclient *slave = ln->value;
if (slave->replstate == redis_repl_wait_bgsave_start) {
startbgsave = 1;
slave->replstate = redis_repl_wait_bgsave_end;
} else if (slave->replstate == redis_repl_wait_bgsave_end) {
// rdb檔案寫入完畢
struct redis_stat buf;
if (bgsaveerr != redis_ok) {
freeclient(slave);
redislog(redis_warning,“sync failed. bgsave child returned an error“);
continue;
// 打開剛寫入的rdb檔案
if ((slave->repldbfd = open(server.rdb_filename,o_rdonly)) == –1 ||
redis_fstat(slave->repldbfd,&buf) == –1) {
redislog(redis_warning,“sync failed. can’t open/stat db after bgsave: %s“, strerror(errno));
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = redis_repl_send_bulk;
aedeletefileevent(server.el,slave->fd,ae_writable);
// 開始發送rdb檔案
if (aecreatefileevent(server.el, slave->fd, ae_writable, sendbulktoslave, slave) == ae_err) {
if (startbgsave) {
/* since we are starting a new background save for one or more slaves,
* we flush the replication script cache to use eval to propagate every
* new evalsha for the first time, since all the new slaves don’t know
* about previous scripts. */
redislog(redis_warning,“sync failed. bgsave failed“);
if (slave->replstate == redis_repl_wait_bgsave_start)
上面的流程結束,slave已經包含了master bgsave時所包含的所有資料。後續就需要master一直将自己的指令發送給slave。
void call(redisclient *c, int flags) {
/* propagate the command into the aof and replication link */
if (flags & redis_call_propagate) {
int flags = redis_propagate_none;
if (c->flags & redis_force_repl) flags |= redis_propagate_repl;
if (c->flags & redis_force_aof) flags |= redis_propagate_aof;
if (dirty)
flags |= (redis_propagate_repl | redis_propagate_aof);
if (flags != redis_propagate_none)
propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
void propagate(struct rediscommand *cmd, int dbid, robj **argv, int argc,
int flags)
if (server.aof_state != redis_aof_off && flags & redis_propagate_aof)
feedappendonlyfile(cmd,dbid,argv,argc);
if (flags & redis_propagate_repl)
replicationfeedslaves(server.slaves,dbid,argv,argc);
在調用任何指令的時候,都會将指令分發到slave上去(除了aof加載或者指令加了redis_cmd_skip_monitor标簽)。
replicationfeedslaves函數主要作用有兩個:
将指令發送給所有線上的slave
将指令寫入到back log中,友善後續增量同步
void replicationfeedslaves(list *slaves, int dictid, robj **argv, int argc) {
int j, len;
char llstr[redis_longstr_size];
/* if there aren’t slaves, and there is no backlog buffer to populate,
* we can return asap. */
if (server.repl_backlog == null && listlength(slaves) == 0) return;
/* we can’t have slaves attached and no backlog. */
redisassert(!(listlength(slaves) != 0 && server.repl_backlog == null));
/* send select command to every slave if needed. */
if (server.slaveseldb != dictid) {
robj *selectcmd;
/* for a few dbs we have pre-computed select command. */
// 每次都增加一個select指令,防止弄錯db
if (dictid >= 0 && dictid < redis_shared_select_cmds) {
selectcmd = shared.select[dictid];
int dictid_len;
dictid_len = ll2string(llstr,sizeof(llstr),dictid);
selectcmd = createobject(redis_string,
sdscatprintf(sdsempty(),
“*2\r\n$6\r\nselect\r\n$%d\r\n%s\r\n“,
dictid_len, llstr));
/* add the select command into the backlog. */
// 将select指令寫入到backlog中
if (server.repl_backlog) feedreplicationbacklogwithobject(selectcmd);
/* send it to slaves. */
// 将select指令發送給slave
listrewind(slaves,&li);
addreply(slave,selectcmd);
if (dictid < 0 || dictid >= redis_shared_select_cmds)
decrrefcount(selectcmd);
server.slaveseldb = dictid;
/* write the command to the replication backlog if any. */
// 将指令寫入到backlog中
if (server.repl_backlog) {
char aux[redis_longstr_size+3];
/* add the multi bulk reply length. */
aux[0] = ‘*‘;
len = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = ‘\r‘;
aux[len+2] = ‘\n‘;
feedreplicationbacklog(aux,len+3);
for (j = 0; j < argc; j++) {
long objlen = stringobjectlen(argv[j]);
/* we need to feed the buffer with the object as a bulk reply
* not just as a plain string, so create the $..crlf payload len
* ad add the final crlf */
aux[0] = ‘$‘;
len = ll2string(aux+1,sizeof(aux)-1,objlen);
feedreplicationbacklogwithobject(argv[j]);
feedreplicationbacklog(aux+len+1,2);
/* write the command to every slave. */
// 将指令發送到所有的slave
/* don’t feed slaves that are still waiting for bgsave to start */
if (slave->replstate == redis_repl_wait_bgsave_start) continue;
/* feed slaves that are waiting for the initial sync (so these commands
* are queued in the output buffer until the initial sync completes),
* or are already in sync with the master. */
/* add the multi bulk length. */
addreplymultibulklen(slave,argc);
/* finally any additional argument that was not stored inside the
* static buffer if any (from j to argc). */
for (j = 0; j < argc; j++)
addreplybulk(slave,argv[j]);
注:backlog大小可以設定,預設的大小為1m,如果超過,覆寫最初的日志
#define redis_default_repl_backlog_size (1024*1024) /* 1mb */
在指令傳播階段,slave每秒一次向master發送replconf指令,發送目前的offset,讓master檢測是否有指令丢失。 這個也是在定時器中發送的。
if (server.masterhost && server.master &&
!(server.master->flags & redis_pre_psync))
replicationsendack();
void replicationsendack(void) {
redisclient *c = server.master;
if (c != null) {
c->flags |= redis_master_force_reply;
addreplymultibulklen(c,3);
addreplybulkcstring(c,“replconf“);
addreplybulkcstring(c,“ack“);
addreplybulklonglong(c,c->reploff);
c->flags &= ~redis_master_force_reply;
同時,master在接收到這個ack包的時候,會記錄slave的ack offset和ack時間:
else if (!strcasecmp(c->argv[j]->ptr,“ack“)) {
/* replconf ack is used by slave to inform the master the amount
* of replication stream that it processed so far. it is an
* internal only command that normal clients should never use. */
long long offset;
if (!(c->flags & redis_slave)) return;
if ((getlonglongfromobject(c->argv[j+1], &offset) != redis_ok))
if (offset > c->repl_ack_off)
c->repl_ack_off = offset;
/* note: this command does not reply anything! */
還是在定時器中,每次調用的時候都會清理已經逾時的slave:
/* disconnect timedout slaves. */
if (listlength(server.slaves)) {
if (slave->replstate != redis_repl_online) continue;
if (slave->flags & redis_pre_psync) continue;
if ((server.unixtime – slave->repl_ack_time) > server.repl_timeout)
char ip[redis_ip_str_len];
int port;
if (anetpeertostring(slave->fd,ip,sizeof(ip),&port) != –1) {
“disconnecting timedout slave: %s:%d“,
ip, slave->slave_listening_port);
這裡的repl_ack_time由slave每次發送的ack包寫入,server.repl_timeout預設值是60s:
#define redis_repl_timeout 60
master斷開了slave連接配接之後,slave為了能夠進行增量同步,freeclient的實作,針對master的slave client,也有不同的處理:
void freeclient(redisclient *c) {
/* if it is our master that’s beging disconnected we should make sure
* to cache the state to try a partial resynchronization later.
* note that before doing this we make sure that the client is not in
* some unexpected state, by checking its flags. */
if (server.master && c->flags & redis_master) {
redislog(redis_warning,“connection with master lost.“);
if (!(c->flags & (redis_close_after_reply|
redis_close_asap|
redis_blocked|
redis_unblocked)))
replicationcachemaster(c);
void replicationcachemaster(redisclient *c) {
redisassert(server.master != null && server.cached_master == null);
redislog(redis_notice,“caching the disconnected master state.“);
/* remove from the list of clients, we don’t want this client to be
* listed by client list or processed in any way by batch operations. */
// 首先将slave從client清單中删除
ln = listsearchkey(server.clients,c);
redisassert(ln != null);
listdelnode(server.clients,ln);
/* save the master. server.master will be set to null later by
* replicationhandlemasterdisconnection(). */
// 把slave的master儲存到cached_master中
server.cached_master = server.master;
/* remove the event handlers and close the socket. we’ll later reuse
* the socket of the new connection with the master during psync. */
// 清理slave連接配接,釋放資源
aedeletefileevent(server.el,c->fd,ae_readable);
aedeletefileevent(server.el,c->fd,ae_writable);
close(c->fd);
/* set fd to -1 so that we can safely call freeclient(c) later. */
c->fd = –1;
/* invalidate the peer id cache. */
if (c->peerid) {
sdsfree(c->peerid);
c->peerid = null;
·
/* caching the master happens instead of the actual freeclient() call,
* so make sure to adjust the replication state. this function will
* also set server.master to null. */
replicationhandlemasterdisconnection();
void replicationhandlemasterdisconnection(void) {
server.master = null;
server.repl_down_since = server.unixtime;
/* we lost connection with our master, force our slaves to resync
* with us as well to load the new data set.
* if server.masterhost is null the user called slaveof no one so
* slave resync is not needed. */
if (server.masterhost != null) disconnectslaves();
經過這些處理,一個斷開連接配接的slave,複制狀态變成了redis_repl_connect。按照之前的流程,定時器會去嘗試連接配接master, 發送ping指令,然後再發送psync指令的時候,由于已經有了cached_master,會在psync指令中帶上之前master的id和偏移量。 相關slave和master的處理邏輯,前面代碼中已經有了。
轉載自:https://coolex.info/blog/463.html