對于叢集初始化,在 redis cluster 形成之前,各個節點都是獨立的,它們主要是通過節點之間的 CLUSTER MEET 指令來初始化各個節點中的 clusterState 中的 nodes 成員,并建構最終的 cluster,cluster meet 的指令格式如下:
CLUSTER MEET <ip> <port>
下面以 NodeA 和 NodeB 兩個節點逐漸組成叢集為例,用戶端登入到 NodeA,并執行 MEET 指令将 NodeB 節點加入到 NodeA 的叢集中。
1) 接收到用戶端發起的 MEET 指令的節點A(NodeA),會向 <ip>:<port> 的節點B(NodeB) 發起 Handshake 流程,如下代碼:
void clusterCommand(redisClient *c) {
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
}
if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
// 接收到從 client 執行的 cluster meet <ip> <port> 指令
long long port;
// 擷取 NodeB 的 Port(注:非 cluster bus 的監聽端口)
if (getLongLongFromObject(c->argv[3], &port) != REDIS_OK) {
addReplyErrorFormat(c,"Invalid TCP port specified: %s",
(char*)c->argv[3]->ptr);
return;
}
// 開始發起 handshake 流程
if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 &&
errno == EINVAL)
{
addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
(char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
} else {
addReply(c,shared.ok);
}
}
... ...
}
從上面代碼可以看出發起 handshake 流程是在 clusterStartHandshake 函數中實作的:
int clusterStartHandshake(char *ip, int port) {
clusterNode *n;
char norm_ip[REDIS_IP_STR_LEN];
struct sockaddr_storage sa;
/* IP 和 端口 檢查 */
... ...
/* IP 位址 normalize */
... ...
/* 将節點 NodeB 加入 Handshake 階段 */
if (clusterHandshakeInProgress(norm_ip,port)) {
errno = EAGAIN;
return 0;
}
/* Add the node with a random address (NULL as first argument to
* createClusterNode()). Everything will be fixed during the
* handskake. */
// 建立新的節點 NodeB,同時設定 flag 為 HANDSHAKE|MEET
// 并添加至 NodeA 本地維護的 cluster nodes 表中
n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
memcpy(n->ip,norm_ip,sizeof(n->ip));
n->port = port;
clusterAddNode(n);
return 1;
}
這裡的代碼并沒有去發送 MEET 消息至 NodeB,而是在 clusterCron 中實作的,clusterCron 函數每秒被執行 10 次,如下(省略了處理心跳等其他流程):
/* This is executed 10 times every second */
void clusterCron(void) {
dictIterator *di;
dictEntry *de;
int update_state = 0;
int orphaned_masters; /* How many masters there are without ok slaves. */
int max_slaves; /* Max number of ok slaves for a single master. */
int this_slaves; /* Number of ok slaves for our master (if we are slave). */
mstime_t min_pong = 0, now = mstime();
clusterNode *min_pong_node = NULL;
... ...
/* Check if we have disconnected nodes and re-establish the connection. */
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
... ...
if (node->link == NULL) {
// 對于首次握手,節點之間是沒有鍊路的
int fd;
mstime_t old_ping_sent;
clusterLink *link;
// 建立一個新的 NodeA 和 NodeB 的 cluster bus 鍊路
// 對端端口為 node->port + 10000
// 并設定此鍊路的 READ 事件處理器為 clusterReadHandler
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->port+REDIS_CLUSTER_PORT_INCR, REDIS_BIND_ADDR);
link = createClusterLink(node);
link->fd = fd;
node->link = link;
aeCreateFileEvent(server.el,link->fd,AE_READABLE,
clusterReadHandler,link);
/* Queue a PING in the new connection ASAP: this is crucial
* to avoid false positives in failure detection.
*
* If the node is flagged as MEET, we send a MEET message instead
* of a PING one, to force the receiver to add us in its node
* table. */
// 對于 NodeA 和 NodeB 之間的首次連接配接,flag 設定的為 MEET
// 這裡向 NodeB 發送 MEET 消息
old_ping_sent = node->ping_sent;
clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
if (old_ping_sent) {
/* If there was an active ping before the link was
* disconnected, we want to restore the ping time, otherwise
* replaced by the clusterSendPing() call. */
node->ping_sent = old_ping_sent;
}
/* We can clear the flag after the first packet is sent.
* If we'll never receive a PONG, we'll never send new packets
* to this node. Instead after the PONG is received and we
* are no longer in meet/handshake status, we want to send
* normal PING packets. */
node->flags &= ~REDIS_NODE_MEET;
redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d",
node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
}
}
dictReleaseIterator(di);
... ...
}
2) 而接收到此 握手請求 MEET 消息的節點 NodeB,首先将 NodeA 加入到 NodeB 維護的 cluster nodes 表中,并傳回 PONG 消息,但是此時 NodeB 中維護的 NodeA 資訊的 flag 仍然為 HANDSHAKE 階段(因為 NodeB 此時并不确認 NodeA 已經接收到 PONG 響應),如下代碼:
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node);
/* We use incoming MEET messages in order to set the address
* for 'myself', since only other cluster nodes will send us
* MEET messagses on handshakes, when the cluster joins, or
* later if we changed address, and those nodes will use our
* official address to connect to us. So by obtaining this address
* from the socket is a simple way to discover / update our own
* address in the cluster without it being hardcoded in the config. */
if (type == CLUSTERMSG_TYPE_MEET) {
char ip[REDIS_IP_STR_LEN];
// 通過外部節點連接配接的本節點位址來更新本節點對外的 IP 位址
// 這裡的 myself 為全局引用,最終指向 server.cluster->myself
// 代表本節點自身
if (anetSockName(link->fd,ip,sizeof(ip),NULL) != -1 &&
strcmp(ip,myself->ip))
{
memcpy(myself->ip,ip,REDIS_IP_STR_LEN);
redisLog(REDIS_WARNING,"IP address for this node updated to %s",
myself->ip);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
}
/* Add this node if it is new for us and the msg type is MEET.
* In this stage we don't try to add the node with the right
* flags, slaveof pointer, and so forth, as this details will be
* resolved when we'll receive PONGs from the node. */
if (!sender && type == CLUSTERMSG_TYPE_MEET) {
clusterNode *node;
// 建立新的 cluster 節點
// 且 flags 設定為 REDIS_NODE_HANDSHAKE
// 表示目前仍然處于 握手階段
// 需要等接收到 PONG 消息後才将此節點的
node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
nodeIp2String(node->ip,link);
node->port = ntohs(hdr->port);
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
/* Get info from the gossip section */
clusterProcessGossipSection(hdr,link);
/* Anyway reply with a PONG */
clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
}
/* PING or PONG: process config information. */
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET)
{
if (link->node) {
if (nodeInHandshake(link->node)) {
/* If we already have this node, try to change the
* IP/port of the node with the new one. */
// 如果之前已經有過連接配接,則會釋放該鍊路
if (sender) {
if (nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port)))
{
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
/* Free this node as we already have it. This will
* cause the link to be freed as well. */
freeClusterNode(link->node);
return 0;
}
/* First thing to do is replacing the random name with the
* right node name if this was a handshake stage. */
// 節點重命名,因為之前對于 NodeB 的 name 是采用的随機命名(clusterCreateNode)
// 這裡以 NodeB 過來的實際名稱為準
clusterRenameNode(link->node, hdr->sender);
link->node->flags &= ~REDIS_NODE_HANDSHAKE; // 不在處于 HANDSHAKE 狀态
link->node->flags |= flags&(REDIS_NODE_MASTER|REDIS_NODE_SLAVE);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
... ...
}
... ...
}