天天看點

Nacos源碼分析二十二、資料一緻性同步-CP

然後是RaftConsistencyServiceImpl CP強制一緻性實作。

同樣是put方法:

@Override
public void put(String key, Record value) throws NacosException {
    try {
        raftCore.signalPublish(key, value);
    } catch (Exception e) {
        Loggers.RAFT.error("Raft put failed.", e);
        throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,
                e);
    }
}
           

然後是raftCore.signalPublish:

public void signalPublish(String key, Record value) throws Exception {

    //不是leader
    if (!isLeader()) {
        ObjectNode params = JacksonUtils.createEmptyJsonNode();
        params.put("key", key);
        params.replace("value", JacksonUtils.transferToJsonNode(value));
        Map<String, String> parameters = new HashMap<>(1);
        parameters.put("key", key);
        
        final RaftPeer leader = getLeader();

        //交給leader去做/v1/ns/raft/datum
        raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
        return;
    }
    
    try {
        // 是leader
        // 加鎖
        OPERATE_LOCK.lock();
        final long start = System.currentTimeMillis();
        final Datum datum = new Datum();
        datum.key = key;
        datum.value = value;
        if (getDatum(key) == null) {
            datum.timestamp.set(1L);
        } else {
            datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
        }
        
        ObjectNode json = JacksonUtils.createEmptyJsonNode();
        json.replace("datum", JacksonUtils.transferToJsonNode(datum));
        json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));

        //釋出資料改變通知  peers是所有節點集合. peers.local擷取本機
        onPublish(datum, peers.local());
        
        final String content = json.toString();

        // 過半同步成功才會響應,也就是說put操作需要過半同步成功,強一緻性 CP模型
        //隻要過半的結點數
        final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
        //周遊所有結點
        for (final String server : peers.allServersIncludeMyself()) {
            //自己算一次
            if (isLeader(server)) {
                latch.countDown();
                continue;
            }
            // /v1/ns/raft/datum/commit
            final String url = buildUrl(server, API_ON_PUB);
            // 異步同步資料
            HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content,
                    new AsyncCompletionHandler<Integer>() {
                        @Override
                        public Integer onCompleted(Response response) throws Exception {
                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                Loggers.RAFT
                                        .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                                datum.key, server, response.getStatusCode());
                                return 1;
                            }
                            // 處理完成coundDown
                            latch.countDown();
                            return 0;
                        }
                        
                        @Override
                        public STATE onContentWriteCompleted() {
                            return STATE.CONTINUE;
                        }
                    });
            
        }

        //等待半數完成  還有個5秒逾時時間
        if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
            // only majority servers return success can we consider this update success
            Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
            throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
        }
        
        long end = System.currentTimeMillis();
        Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
    } finally {
        // 解鎖
        OPERATE_LOCK.unlock();
    }
}
           

此時有兩個分支,目前節點是leader的情況,和目前節點不是leader的情況。

目前節點不是leader

此時通過raftProxy.proxyPostLarge将消息轉發給leader,RaftController的publish方法接收:

@PostMapping("/datum")
public String publish(HttpServletRequest request, HttpServletResponse response) throws Exception {
    
    response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
    response.setHeader("Cache-Control", "no-cache");
    response.setHeader("Content-Encode", "gzip");
    
    String entity = IoUtils.toString(request.getInputStream(), "UTF-8");
    String value = URLDecoder.decode(entity, "UTF-8");
    JsonNode json = JacksonUtils.toObj(value);
    
    String key = json.get("key").asText();
    if (KeyBuilder.matchInstanceListKey(key)) {
        raftConsistencyService.put(key, JacksonUtils.toObj(json.get("value").toString(), Instances.class));
        return "ok";
    }
    
    if (KeyBuilder.matchSwitchKey(key)) {
        raftConsistencyService.put(key, JacksonUtils.toObj(json.get("value").toString(), SwitchDomain.class));
        return "ok";
    }
    
    if (KeyBuilder.matchServiceMetaKey(key)) {
        raftConsistencyService.put(key, JacksonUtils.toObj(json.get("value").toString(), Service.class));
        return "ok";
    }
    
    throw new NacosException(NacosException.INVALID_PARAM, "unknown type publish key: " + key);
}
           

可以看到就是調用raftConsistencyService.put方法,回到最初開始的地方了。

目前節點是leader時

首先onPublish:

public void onPublish(Datum datum, RaftPeer source) throws Exception {
    RaftPeer local = peers.local();
    if (datum.value == null) {
        Loggers.RAFT.warn("received empty datum");
        throw new IllegalStateException("received empty datum");
    }

    //不是leader不能幹這個事
    if (!peers.isLeader(source.ip)) {
        Loggers.RAFT
                .warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
                        JacksonUtils.toJson(getLeader()));
        throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
    }

    //過時了
    if (source.term.get() < local.term.get()) {
        Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
                JacksonUtils.toJson(local));
        throw new IllegalStateException(
                "out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
    }

    //重置任期
    local.resetLeaderDue();
    
    // if data should be persisted, usually this is true:
    if (KeyBuilder.matchPersistentKey(datum.key)) {
        raftStore.write(datum);
    }

    //放入資料
    datums.put(datum.key, datum);
    
    if (isLeader()) {
        local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
    } else {
        if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
            //set leader term:
            getLeader().term.set(source.term.get());
            local.term.set(getLeader().term.get());
        } else {
            local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
        }
    }
    raftStore.updateTerm(local.term.get());

    //添加任務
    notifier.addTask(datum.key, ApplyAction.CHANGE);
    
    Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}
           

這裡做了幾件事

  1. 更新leader任期
  2. 如果資料需要永久存儲,則raftStore.write(datum)寫資料到磁盤。
  3. 本地datums緩存資料
  4. 更新任期辨別term
  5. notifier添加change任務通知

一緻性同步

叢集下的所有節點都同步一下,調用/v1/ns/raft/datum/commit接口,CountDownLatch的數量是叢集數量/2+1,這樣保證半數通過。

final String content = json.toString();

// 過半同步成功才會響應,也就是說put操作需要過半同步成功,強一緻性 CP模型
//隻要過半的結點數
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
//周遊所有結點
for (final String server : peers.allServersIncludeMyself()) {
    //自己算一次
    if (isLeader(server)) {
        latch.countDown();
        continue;
    }
    // /v1/ns/raft/datum/commit
    final String url = buildUrl(server, API_ON_PUB);
    //同步資料
    HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content,
            new AsyncCompletionHandler<Integer>() {
                @Override
                public Integer onCompleted(Response response) throws Exception {
                    if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                        Loggers.RAFT
                                .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
                                        datum.key, server, response.getStatusCode());
                        return 1;
                    }
                    // 處理完成coundDown
                    latch.countDown();
                    return 0;
                }
                
                @Override
                public STATE onContentWriteCompleted() {
                    return STATE.CONTINUE;
                }
            });
    
}

//等待半數完成  還有個5秒逾時時間
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
    // only majority servers return success can we consider this update success
    Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
    throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}
           

我們看一下同步資料的接收方,RaftController的onPublish方法:

@PostMapping("/datum/commit")
public String onPublish(HttpServletRequest request, HttpServletResponse response) throws Exception {
    
    response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
    response.setHeader("Cache-Control", "no-cache");
    response.setHeader("Content-Encode", "gzip");
    
    String entity = IoUtils.toString(request.getInputStream(), "UTF-8");
    String value = URLDecoder.decode(entity, "UTF-8");
    
    JsonNode jsonObject = JacksonUtils.toObj(value);
    String key = "key";
    
    RaftPeer source = JacksonUtils.toObj(jsonObject.get("source").toString(), RaftPeer.class);
    JsonNode datumJson = jsonObject.get("datum");
    
    Datum datum = null;
    if (KeyBuilder.matchInstanceListKey(datumJson.get(key).asText())) {
        datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<Instances>>() {
        });
    } else if (KeyBuilder.matchSwitchKey(datumJson.get(key).asText())) {
        datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<SwitchDomain>>() {
        });
    } else if (KeyBuilder.matchServiceMetaKey(datumJson.get(key).asText())) {
        datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<Service>>() {
        });
    }

    // 還是調用RaftConsistencyServiceImpl的onPublish
    raftConsistencyService.onPut(datum, source);
    return "ok";
}
           

調用raftConsistencyService.onPut方法

public void onPut(Datum datum, RaftPeer source) throws NacosException {
    try {
        raftCore.onPublish(datum, source);
    } catch (Exception e) {
        Loggers.RAFT.error("Raft onPut failed.", e);
        throw new NacosException(NacosException.SERVER_ERROR,
                "Raft onPut failed, datum:" + datum + ", source: " + source, e);
    }
}
           

然後是raftCore.onPublish方法,回到上面了。

總結

AP和CP模式下的資料一緻性同步我們分析完了。對照着流程圖看還是很容易搞明白的。下面我們開始讨論nacos的raft選舉。