天天看點

Redis Cluster 的實作 - 加入叢集節點

對于叢集初始化,在 redis cluster 形成之前,各個節點都是獨立的,它們主要是通過節點之間的 CLUSTER MEET 指令來初始化各個節點中的 clusterState 中的 nodes 成員,并建構最終的 cluster,cluster meet 的指令格式如下:

  CLUSTER MEET <ip> <port>      

  下面以 NodeA 和 NodeB 兩個節點逐漸組成叢集為例,用戶端登入到 NodeA,并執行 MEET 指令将 NodeB 節點加入到 NodeA 的叢集中。

  1) 接收到用戶端發起的 MEET 指令的節點A(NodeA),會向 <ip>:<port> 的節點B(NodeB) 發起 Handshake 流程,如下代碼:

void clusterCommand(redisClient *c) {
    if (server.cluster_enabled == 0) {
        addReplyError(c,"This instance has cluster support disabled");
        return;
    }
    
    if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
        // 接收到從 client 執行的 cluster meet <ip> <port> 指令
        long long port;
        
        // 擷取 NodeB 的 Port(注:非 cluster bus 的監聽端口)
        if (getLongLongFromObject(c->argv[3], &port) != REDIS_OK) {
            addReplyErrorFormat(c,"Invalid TCP port specified: %s",
                                (char*)c->argv[3]->ptr);
            return;
        }
        
        // 開始發起 handshake 流程
        if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 &&
            errno == EINVAL)
        {
            addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
                            (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
        } else {
            addReply(c,shared.ok);
        }
    }
    
    ... ...
}      

  從上面代碼可以看出發起 handshake 流程是在 clusterStartHandshake 函數中實作的:

int clusterStartHandshake(char *ip, int port) {
    clusterNode *n;
    char norm_ip[REDIS_IP_STR_LEN];
    struct sockaddr_storage sa;
    /* IP 和 端口 檢查 */
    ... ...
    /* IP 位址 normalize */
    ... ...
    
    /* 将節點 NodeB 加入 Handshake 階段 */
    if (clusterHandshakeInProgress(norm_ip,port)) {
        errno = EAGAIN;
        return 0;
    }
    /* Add the node with a random address (NULL as first argument to
     * createClusterNode()). Everything will be fixed during the
     * handskake. */
    // 建立新的節點 NodeB,同時設定 flag 為 HANDSHAKE|MEET
    // 并添加至 NodeA 本地維護的 cluster nodes 表中
    n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
    memcpy(n->ip,norm_ip,sizeof(n->ip));
    n->port = port;
    clusterAddNode(n);
    return 1;
}      

這裡的代碼并沒有去發送 MEET 消息至 NodeB,而是在 clusterCron 中實作的,clusterCron 函數每秒被執行 10 次,如下(省略了處理心跳等其他流程):

/* This is executed 10 times every second */
void clusterCron(void) {
    dictIterator *di;
    dictEntry *de;
    int update_state = 0;
    int orphaned_masters; /* How many masters there are without ok slaves. */
    int max_slaves; /* Max number of ok slaves for a single master. */
    int this_slaves; /* Number of ok slaves for our master (if we are slave). */
    mstime_t min_pong = 0, now = mstime();
    clusterNode *min_pong_node = NULL;
    ... ...
    
    /* Check if we have disconnected nodes and re-establish the connection. */
    di = dictGetSafeIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        ... ...
        
        if (node->link == NULL) {
            // 對于首次握手,節點之間是沒有鍊路的
            int fd;
            mstime_t old_ping_sent;
            clusterLink *link;
            
            // 建立一個新的 NodeA 和 NodeB 的 cluster bus 鍊路
            // 對端端口為 node->port + 10000
            // 并設定此鍊路的 READ 事件處理器為 clusterReadHandler
            fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
                node->port+REDIS_CLUSTER_PORT_INCR, REDIS_BIND_ADDR);
            link = createClusterLink(node);
            link->fd = fd;
            node->link = link;
            aeCreateFileEvent(server.el,link->fd,AE_READABLE,
                    clusterReadHandler,link);
                    
            /* Queue a PING in the new connection ASAP: this is crucial
             * to avoid false positives in failure detection.
             *
             * If the node is flagged as MEET, we send a MEET message instead
             * of a PING one, to force the receiver to add us in its node
             * table. */
            // 對于 NodeA 和 NodeB 之間的首次連接配接,flag 設定的為 MEET
            // 這裡向 NodeB 發送 MEET 消息
            old_ping_sent = node->ping_sent;
            clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
                    CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
            if (old_ping_sent) {
                /* If there was an active ping before the link was
                 * disconnected, we want to restore the ping time, otherwise
                 * replaced by the clusterSendPing() call. */
                node->ping_sent = old_ping_sent;
            }
            
            /* We can clear the flag after the first packet is sent.
             * If we'll never receive a PONG, we'll never send new packets
             * to this node. Instead after the PONG is received and we
             * are no longer in meet/handshake status, we want to send
             * normal PING packets. */
            node->flags &= ~REDIS_NODE_MEET;
            redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d",
                    node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
        }
    }
    dictReleaseIterator(di);
    ... ...
}      

  2) 而接收到此 握手請求 MEET 消息的節點 NodeB,首先将 NodeA 加入到 NodeB 維護的 cluster nodes 表中,并傳回 PONG 消息,但是此時 NodeB 中維護的 NodeA 資訊的 flag 仍然為 HANDSHAKE 階段(因為 NodeB 此時并不确認 NodeA 已經接收到 PONG 響應),如下代碼:

    if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
        redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node);
        
        /* We use incoming MEET messages in order to set the address
         * for 'myself', since only other cluster nodes will send us
         * MEET messagses on handshakes, when the cluster joins, or
         * later if we changed address, and those nodes will use our
         * official address to connect to us. So by obtaining this address
         * from the socket is a simple way to discover / update our own
         * address in the cluster without it being hardcoded in the config. */
        if (type == CLUSTERMSG_TYPE_MEET) {
            char ip[REDIS_IP_STR_LEN];
            
            // 通過外部節點連接配接的本節點位址來更新本節點對外的 IP 位址
            // 這裡的 myself 為全局引用,最終指向 server.cluster->myself
            // 代表本節點自身
            if (anetSockName(link->fd,ip,sizeof(ip),NULL) != -1 &&
                strcmp(ip,myself->ip))
            {
                memcpy(myself->ip,ip,REDIS_IP_STR_LEN);
                redisLog(REDIS_WARNING,"IP address for this node updated to %s",
                    myself->ip);
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
            }
        }
        /* Add this node if it is new for us and the msg type is MEET.
         * In this stage we don't try to add the node with the right
         * flags, slaveof pointer, and so forth, as this details will be
         * resolved when we'll receive PONGs from the node. */
        if (!sender && type == CLUSTERMSG_TYPE_MEET) {
            clusterNode *node;
            
            // 建立新的 cluster 節點
            // 且 flags 設定為 REDIS_NODE_HANDSHAKE
            // 表示目前仍然處于 握手階段
            // 需要等接收到 PONG 消息後才将此節點的
            node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
            nodeIp2String(node->ip,link);
            node->port = ntohs(hdr->port);
            clusterAddNode(node);
            clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
        }
        /* Get info from the gossip section */
        clusterProcessGossipSection(hdr,link);
        /* Anyway reply with a PONG */
        clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
    }      
    /* PING or PONG: process config information. */
    if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
        type == CLUSTERMSG_TYPE_MEET)
    {
        if (link->node) {
            if (nodeInHandshake(link->node)) {
                /* If we already have this node, try to change the
                 * IP/port of the node with the new one. */
                // 如果之前已經有過連接配接,則會釋放該鍊路
                if (sender) {
                    if (nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port)))
                    {
                        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                                             CLUSTER_TODO_UPDATE_STATE);
                    }
                    
                    /* Free this node as we already have it. This will
                     * cause the link to be freed as well. */
                    freeClusterNode(link->node);
                    return 0;
                }
                /* First thing to do is replacing the random name with the
                 * right node name if this was a handshake stage. */
                // 節點重命名,因為之前對于 NodeB 的 name 是采用的随機命名(clusterCreateNode)
                // 這裡以 NodeB 過來的實際名稱為準
                clusterRenameNode(link->node, hdr->sender);
                link->node->flags &= ~REDIS_NODE_HANDSHAKE; // 不在處于 HANDSHAKE 狀态
                link->node->flags |= flags&(REDIS_NODE_MASTER|REDIS_NODE_SLAVE);
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
            }
            ... ...
        } 
        ... ...
    }      

繼續閱讀