然後是RaftConsistencyServiceImpl CP強制一緻性實作。
同樣是put方法:
@Override
public void put(String key, Record value) throws NacosException {
try {
raftCore.signalPublish(key, value);
} catch (Exception e) {
Loggers.RAFT.error("Raft put failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,
e);
}
}
然後是raftCore.signalPublish:
public void signalPublish(String key, Record value) throws Exception {
//不是leader
if (!isLeader()) {
ObjectNode params = JacksonUtils.createEmptyJsonNode();
params.put("key", key);
params.replace("value", JacksonUtils.transferToJsonNode(value));
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
final RaftPeer leader = getLeader();
//交給leader去做/v1/ns/raft/datum
raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
return;
}
try {
// 是leader
// 加鎖
OPERATE_LOCK.lock();
final long start = System.currentTimeMillis();
final Datum datum = new Datum();
datum.key = key;
datum.value = value;
if (getDatum(key) == null) {
datum.timestamp.set(1L);
} else {
datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
}
ObjectNode json = JacksonUtils.createEmptyJsonNode();
json.replace("datum", JacksonUtils.transferToJsonNode(datum));
json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
//釋出資料改變通知 peers是所有節點集合. peers.local擷取本機
onPublish(datum, peers.local());
final String content = json.toString();
// 過半同步成功才會響應,也就是說put操作需要過半同步成功,強一緻性 CP模型
//隻要過半的結點數
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
//周遊所有結點
for (final String server : peers.allServersIncludeMyself()) {
//自己算一次
if (isLeader(server)) {
latch.countDown();
continue;
}
// /v1/ns/raft/datum/commit
final String url = buildUrl(server, API_ON_PUB);
// 異步同步資料
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content,
new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT
.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
datum.key, server, response.getStatusCode());
return 1;
}
// 處理完成coundDown
latch.countDown();
return 0;
}
@Override
public STATE onContentWriteCompleted() {
return STATE.CONTINUE;
}
});
}
//等待半數完成 還有個5秒逾時時間
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
// only majority servers return success can we consider this update success
Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}
long end = System.currentTimeMillis();
Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
} finally {
// 解鎖
OPERATE_LOCK.unlock();
}
}
此時有兩個分支,目前節點是leader的情況,和目前節點不是leader的情況。
目前節點不是leader
此時通過raftProxy.proxyPostLarge将消息轉發給leader,RaftController的publish方法接收:
@PostMapping("/datum")
public String publish(HttpServletRequest request, HttpServletResponse response) throws Exception {
response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Content-Encode", "gzip");
String entity = IoUtils.toString(request.getInputStream(), "UTF-8");
String value = URLDecoder.decode(entity, "UTF-8");
JsonNode json = JacksonUtils.toObj(value);
String key = json.get("key").asText();
if (KeyBuilder.matchInstanceListKey(key)) {
raftConsistencyService.put(key, JacksonUtils.toObj(json.get("value").toString(), Instances.class));
return "ok";
}
if (KeyBuilder.matchSwitchKey(key)) {
raftConsistencyService.put(key, JacksonUtils.toObj(json.get("value").toString(), SwitchDomain.class));
return "ok";
}
if (KeyBuilder.matchServiceMetaKey(key)) {
raftConsistencyService.put(key, JacksonUtils.toObj(json.get("value").toString(), Service.class));
return "ok";
}
throw new NacosException(NacosException.INVALID_PARAM, "unknown type publish key: " + key);
}
可以看到就是調用raftConsistencyService.put方法,回到最初開始的地方了。
目前節點是leader時
首先onPublish:
public void onPublish(Datum datum, RaftPeer source) throws Exception {
RaftPeer local = peers.local();
if (datum.value == null) {
Loggers.RAFT.warn("received empty datum");
throw new IllegalStateException("received empty datum");
}
//不是leader不能幹這個事
if (!peers.isLeader(source.ip)) {
Loggers.RAFT
.warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source),
JacksonUtils.toJson(getLeader()));
throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader");
}
//過時了
if (source.term.get() < local.term.get()) {
Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source),
JacksonUtils.toJson(local));
throw new IllegalStateException(
"out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get());
}
//重置任期
local.resetLeaderDue();
// if data should be persisted, usually this is true:
if (KeyBuilder.matchPersistentKey(datum.key)) {
raftStore.write(datum);
}
//放入資料
datums.put(datum.key, datum);
if (isLeader()) {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
} else {
if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
//set leader term:
getLeader().term.set(source.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
}
}
raftStore.updateTerm(local.term.get());
//添加任務
notifier.addTask(datum.key, ApplyAction.CHANGE);
Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}
這裡做了幾件事
- 更新leader任期
- 如果資料需要永久存儲,則raftStore.write(datum)寫資料到磁盤。
- 本地datums緩存資料
- 更新任期辨別term
- notifier添加change任務通知
一緻性同步
叢集下的所有節點都同步一下,調用/v1/ns/raft/datum/commit接口,CountDownLatch的數量是叢集數量/2+1,這樣保證半數通過。
final String content = json.toString();
// 過半同步成功才會響應,也就是說put操作需要過半同步成功,強一緻性 CP模型
//隻要過半的結點數
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
//周遊所有結點
for (final String server : peers.allServersIncludeMyself()) {
//自己算一次
if (isLeader(server)) {
latch.countDown();
continue;
}
// /v1/ns/raft/datum/commit
final String url = buildUrl(server, API_ON_PUB);
//同步資料
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content,
new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT
.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
datum.key, server, response.getStatusCode());
return 1;
}
// 處理完成coundDown
latch.countDown();
return 0;
}
@Override
public STATE onContentWriteCompleted() {
return STATE.CONTINUE;
}
});
}
//等待半數完成 還有個5秒逾時時間
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
// only majority servers return success can we consider this update success
Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}
我們看一下同步資料的接收方,RaftController的onPublish方法:
@PostMapping("/datum/commit")
public String onPublish(HttpServletRequest request, HttpServletResponse response) throws Exception {
response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request));
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Content-Encode", "gzip");
String entity = IoUtils.toString(request.getInputStream(), "UTF-8");
String value = URLDecoder.decode(entity, "UTF-8");
JsonNode jsonObject = JacksonUtils.toObj(value);
String key = "key";
RaftPeer source = JacksonUtils.toObj(jsonObject.get("source").toString(), RaftPeer.class);
JsonNode datumJson = jsonObject.get("datum");
Datum datum = null;
if (KeyBuilder.matchInstanceListKey(datumJson.get(key).asText())) {
datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<Instances>>() {
});
} else if (KeyBuilder.matchSwitchKey(datumJson.get(key).asText())) {
datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<SwitchDomain>>() {
});
} else if (KeyBuilder.matchServiceMetaKey(datumJson.get(key).asText())) {
datum = JacksonUtils.toObj(jsonObject.get("datum").toString(), new TypeReference<Datum<Service>>() {
});
}
// 還是調用RaftConsistencyServiceImpl的onPublish
raftConsistencyService.onPut(datum, source);
return "ok";
}
調用raftConsistencyService.onPut方法
public void onPut(Datum datum, RaftPeer source) throws NacosException {
try {
raftCore.onPublish(datum, source);
} catch (Exception e) {
Loggers.RAFT.error("Raft onPut failed.", e);
throw new NacosException(NacosException.SERVER_ERROR,
"Raft onPut failed, datum:" + datum + ", source: " + source, e);
}
}
然後是raftCore.onPublish方法,回到上面了。
總結
AP和CP模式下的資料一緻性同步我們分析完了。對照着流程圖看還是很容易搞明白的。下面我們開始讨論nacos的raft選舉。