天天看點

SOFAJRaft 在同程旅遊中的實踐

作者:

· 趙延 |GitHub:horizonzy

同程藝龍進階開發,負責服務治理相關工作,關注 RPC、服務治理和分布式等領域。

· 董春明 

同程藝龍架構師,負責服務治理及雲原生規劃演進相關工作,分布式領域專家,Paper 愛好者。

校對:馮家純(SOFAJRaft Committer)

同程藝龍作為對 Raft 研究較早的公司,早在 14 年算法的 paper 剛釋出的時候,便已經對其進行了調研。同時也與 paxos 、zab 等算法進行了詳細的對比,并在公司内部的計數器、任務排程元資訊存儲等場景進行試點。

不過早期對于 Raft 的嘗試較多的是在 C++ 技術棧試水,在 Java 技術棧裡卻很少有涉及。

近期剛好有基于 etcd 的老項目由于需要自定義的資料結構需要重構,原有的 etcd 無法在底層資料結構層面滿足需求,是以決定采用 Java 技術棧結合開源項目 SOFAJRaft 進行底層資料存儲的開發,以期解決多節點資料強一緻的問題。

本文假設讀者對 Raft 及強一緻的概念已經有了較深的了解,詳細介紹了公司内部如何使用 JRaft 的進行老系統的改造以及使用過程中遇到的工程問題,希望其他對 Raft 有興趣的同學可以一起讨論。

PART. 1

背  景

公司内部原本存在一個系統 mdb (metadata database),go 語言編寫,用于管理所有的執行個體中繼資料資訊,中繼資料的内容就是一個 map。

該元件提供對中繼資料增删改查的接口,并且使用  go 語言編寫,在檢索資料時引入了 K8s selector 的包,使用 K8s selector 的解析規則篩選特定标簽的中繼資料資訊。

資料持久化則是實用了強一緻元件 etcd 進行存儲,key 為中繼資料的 ID,保證唯一,value 為具體的元資訊,包含各個标簽以及對應的值。

該系統大體架構如圖 -1 所示:

SOFAJRaft 在同程旅遊中的實踐

圖-1:原來的架構

「該架構的弊端」

1. 每隔一段時間需要去拉取 etcd 的全量資料,擔心單次請求資料量太大的情況,對資料 ID 進行了 hash 分片,依次擷取每個分片下個 key,再通過 key 去擷取 value,導緻 etcd 的查詢頻率非常高。

2. 非 ID 查詢條件的一緻性查詢,和上面的問題一樣,也需要做拉取全量資料的操作。

3. 更新删除操作也是一樣,需要拉取全量資料再操作。

分析以上問題可以發現,使用 etcd 作為強一緻存儲,但 etcd 是基于 KV 存儲的元件,并且解析元件 mdb 和 etcd 是分離的,在需要保證資料最新的情況下,必須得去 etcd 拿最新的資料到本地再進行操作。

而 etcd 基于 KV,就得拿到 etcd 的全量資料都給拉到本地後再做操作。

如果有一個元件,提供強一緻的存儲能力,又能直接去解析 K8s selector 的規則,存儲的資料結構和中繼資料資訊更加親和,那麼中間的那一層 mdb 就可以直接去掉了,由這個元件來解析對應的 crud 規則,将解析過後的規則直接在本地查詢,那麼以上問題就能夠直接解決了。

PART. 2

改  造

基于以上問題,我們準備自己開發一個強一緻存儲的元件,能夠自己解析 K8s selector 的規則,并且将資料儲存在自己本地。

因為個人對 Java 語言比較了解,并且之前使用 Nacos 時,對 SOFAJRaft 也有一定了解,最終選擇了 SOFAJRaft 來建構強一緻存儲元件,将它命名為 mdb-store。

 主要改造點:

1. 使用 SOFAJRaft 程式設計模型建構業務狀态機,業務狀态機中根據 Raft log 中的 data 類型,進行 crud 的操作。

2. mdb-store 提供與原來 mdb 相同的 api,讓改造對使用者透明,改造完成後隻需要切換域名對應的執行個體即可。

3. 遷移 K8s selector 的解析邏輯,這裡用 Java 寫了一套和 go 版本 K8s selector 一樣解析邏輯的元件 K8s-selector-Java。

改造過後的架構如圖-2所示:

SOFAJRaft 在同程旅遊中的實踐

圖-2:重構後的架構

通過改造過後,将 mdb 移除,讓使用者直接和 mdb-store 進行通信,中間的通信少了一跳,加快了通路效率。将 mdb 和 etcd 也進行了移除,減少了整個系統的元件,降低運維成本。

PART. 3

SOFAJRaft 的具體使用

 将寫操作轉換成 Raft log 

在 SOFAJRaft 中,程式設計模型和通常的 Spring MVC 的程式設計模式不太一樣。

在 Spring MVC 中,一個請求到達了後端,通常會通過 Controller -> Service -> Processor 這麼幾層。Controller 負責本次 http 請求的資源映射, 再由 Controller 去調用特定的 Service 方法,在 Service 層中,對參數進行一些處理和轉換,再交由 Processor 層去對請求做真正的處理。

大體邏輯如圖 -3 所示,

SOFAJRaft 在同程旅遊中的實踐

圖-3:通常的程式設計模型

在 SOFAJRaft 中,所有的寫操作都要通過狀态機去執行(讀操作不需要經過狀态機)。需要将寫操作轉換成 task,狀态機去應用 task 然後再進行業務處理。

task 中包含兩個屬性是需要關注的,一個是 done,一個是 data。

- done 就是本次 task 被狀态機處理完成後的回調,比如在 done 的回調邏輯中,将 response flush 到用戶端。

- data 就是 Raft log 中的具體資料,比如要執行一條插入中繼資料的指令。data 就會包含本次操作的類型(插入),以及本次操作的具體資料。

public class Task implements Serializable {
    private ByteBuffer        data             = LogEntry.EMPTY_DATA;
    private Closure           done;
    /// 省略部分代碼
}      

大體邏輯如圖 -4 所示,

SOFAJRaft 在同程旅遊中的實踐

圖-4:SOFAJRaft 的程式設計模型

所有的操作都抽象成一個實體 Operation,在 Service 層,就根據業務把參數轉換成不同的 Operation,然後再将 Operation 序列化,設定到 Task 中的 data 屬性,再由 Node 将 task 送出。

這裡可以将 task 看成是一個 Raft log,一旦 Raft log 被半數的機器給送出。狀态機就會應用 Raft log,應用完成之後就會觸發 done 中的回調。

· 抽象過後的實體類

class Operation<T> {
    //操作類型,比如增删改查
    int type;
    //操作的哪個表,某些類型不需要此字段
    String table;
    //具體的操作資料,根據 type 的不同,資料類型也會不同
    T params;
}      

· 建構 task 并通過 node 送出給狀态機

final Task task = new Task();
//定義回調的邏輯,當該 Raft log 被狀态機應用後,會進行回調
task.setDone(new StoreClosure(operation, status -> {
    StoreStatus storageStatus = (StoreStatus) status;
    closure.setThrowable(storageStatus.getThrowable());
    closure.setResponse(storageStatus.getResponse());
    closure.run(storageStatus);
}));
//将 operation 進行序列化,在狀态機中會将該值反序列化還原,再交給 processor 處理
task.setData(ByteBuffer.wrap(serializer.serialize(operation)));
node.apply(task);      

 狀态機的實作 

· onApply

onApply 是狀态機的核心功能,其目的就是接收入參中的 Raft log 以及 done,然後将 Raft log 中的資料反序列化,交由自己的業務處理器去進行處理。

處理完成之後,觸發 done 的回調,這裡就和 Node.apply(task) 關聯上了。

while (iter.hasNext()) {
                Status status = Status.OK();
                try {
                    if (iter.done() != null) {
                        // 說明目前狀态機是 Leader,可以直接從 closure 中擷取操作資料
                        closure = (MdbStoreClosure) iter.done();
                        operation = closure.getOperation();
                    } else {
                        // 目前狀态機不是 Leader,通過對 Raft log 中的資料進行反序列化操作,還原操作資料
                        ByteBuffer data = iter.getData();
                        operation = serializer.deserialize(data.array(), Operation.class);
                    }
                    //業務處理器進行業務處理,業務處理器中會判斷 operation 的類型,選擇不同的處理邏輯
                    OperationResponse result = mdbStoreProcessor.processOperation(operation);
                    //将 result 序列化
                    GrpcResponse response = GrpcResponse.newBuilder().setSuccess(true)
                            .setData(ByteString.copyFrom(serializer.serialize(result))).build();
          
                    Optional.ofNullable(closure)
                            .ifPresent(closure1 -> closure1.setResponse(response));
                } catch (Throwable e) {
                    status.setError(RaftError.UNKNOWN, e.toString());
                    Optional.ofNullable(closure).ifPresent(closure1 -> closure1.setThrowable(e));
                    throw e;
                } finally {
                    //對 task 中的 done 進行回調
                    Optional.ofNullable(closure).ifPresent(closure1 -> closure1.run(status));
                }
                //将 Raft log 的消費位置 +1,表示目前這個 Raft log 已經被成功應用了
                iter.next();
            }      

· onSnapshotSave

- 在初始化 SOFAJRaft node 時,存在一個參數,NodeOptions#snapshotUri。

- 該參數設定後就會開啟 snapshot 機制,一般是推薦設定。

- 開啟完成之後,每隔 30 分鐘 就會進行一次 snapshot(這裡需要注意的是,30 分鐘内有 Raft log 送出時,才會進行 snapshot)。

- 在進行 snapshot 的時候,需要把目前的資料進行持久化操作。

- 在 snapshot 完成後,就會将 snapshot 中最後一條 Raft log 之前的 raft-log 全部删除。其意義就是避免 Raft log 一直增加,導緻磁盤占用飙高。

snapshot 機制可以這麼去了解,在 SOFAJRaft 中,業務 processor 中的操作都是狀态機驅動的,而狀态機又是由 Raft log 驅動。那麼 processor 中資料的最終形态其實就是所有的 Raft log 應用的總和。

比如存在一個 Raft log,其業務含義是 i++。10 條 Raft log 被狀态機應用後,驅動 processor 進行 10 次 i++ 操作,最終的值就是為 10。應用就算崩潰重新開機後,重新開機時,他會去應用之前的 10 條 i++ 的Raft log,processor 中的值也還是 10。使用 snapshot 機制,在進行 snapshot 時,把 processor 中的 10 進行持久化,持久化完成過後,将前 10 條 Raft log 進行删除,後續再來 2 條 i++ 的 Raft log,processor 的值變為 12,存在 2 條 i++ 的 Raft log。應用就算崩潰重新開機,那麼它首先也會讀取 snapshot 中的資料 10,再去應用 2 條 i++ 的 Raft log,最終資料也是 12,和崩潰之前保持一緻。

Processor 的最終态 = snapshot + Raft log

MdbStoreStoreSnapshotFileImpl mdbStoreStoreSnapshotFile = (MdbStoreStoreSnapshotFileImpl) snapshotFile;
        String tempPath = snapshotPath + "_temp";
        File tempFile = new File(tempPath);
        FileUtils.deleteDirectory(tempFile);
        FileUtils.forceMkdir(tempFile);
        //記錄總共的 table 數量
        mdbStoreStoreSnapshotFile
                .writeToFile(tempPath, "tailIndex", new TailIndex(persistData.size()));
        //将每一個 table 中的資料都進行持久化
        for (int i = 0; i < persistData.size(); i++) {
            mdbStoreStoreSnapshotFile.writeToFile(tempPath, combineDataFile(i),
                    new TablePersistence(persistData.get(i)));
        }
        File destinationPath = new File(snapshotPath);
        FileUtils.deleteDirectory(destinationPath);
        FileUtils.moveDirectory(tempFile, destinationPath);      

· onSnapshotLoad

onSnapshotLoad 的幾個觸發場景。

1. 當一個節點重新啟動時。

2. 當 Follower 中的 commit-index 都小于 Leader 中 snapshot 的最後一條 Raft log 時(Follower 太落後了,Follower 需要的 Raft log 已經被 Leader 的 snapshot 機制删除了)

onSnapshotLoad 和上面的 onSnapshotSave 是成對的,這裡隻需要把之前儲存的檔案中的記憶體讀取,然後再進行反序列化,添加到 processor 中的資料容器即可。

MdbStoreStoreSnapshotFileImpl mdbStoreStoreSnapshotFile = (MdbStoreStoreSnapshotFileImpl) snapshotFile;
        //讀取總共的檔案數
        TailIndex tailIndex = mdbStoreStoreSnapshotFile
                .readFromFile(snapshotPath, TAIL_INDEX, TailIndex.class);

        int size = tailIndex.data();

        for (int i = 0; i < size; i++) 
            //挨個讀取檔案,将檔案内容進行反序列化
            TablePersistence tablePersistence = mdbStoreStoreSnapshotFile
                    .readFromFile(snapshotPath, combineDataFile(i), TablePersistence.class);
            TableDataDTO data = tablePersistence.data();
            
            Table table = new Table(data.getName(), new HashSet<>(data.getIndexNames()),
                    data.getRetryCount());
            for (Record dataData : data.getDatas()) {
                table.addRecord(dataData);
            }
            //将資料丢給 processor 中的資料容器
            dataComponent.putData(table.getName(), table);
        }      

· 狀态機的其他狀态變更的方法

一般來說,節點的狀态是不會發生變化的,一旦發生變化,就需要去分析應用的狀态了,觀察節點是否正常。

StateMachine 提供了狀态回調的接口,我們在回調中對接内部的監控系統,當狀态機的節點狀态發生變化時,會實時通知到維護人員,維護人員再對應用進行分析排查。

 使用 read-index read 進行讀操作 

按照 Raft 論文正常來說,讀寫操作都隻能由 Leader 進行處理,這樣能夠保證讀取的資料都是一緻的。這樣的話,讀請求的吞吐就沒辦法增加。

關于這個 case,SOFAJRaft 提供了 read-index read,可以在 Follower 中進行讀取操作,并且能保證在 Follower 中讀的結果和在 Leader 中讀的結果一緻。

關于 read-index read 可以參考 pingcap 的這篇部落格:

https://pingcap.com/zh/blog/lease-read
com.alipay.sofa.jraft.Node#readIndex(final byte[] requestContext, final ReadIndexClosure done)
      

第一個參數是發起 read-index read 時的上下文,可以在回調中使用。

第二個參數就是具體的回調邏輯,需要在 run 方法中實作讀取邏輯。

· read-index read 程式設計模型

CompletableFuture future = new CompletableFuture<>();    
        node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
                @Override
                public void run(Status status, long index, byte[] reqCtx) {
                    //狀态 ok,說明可以通過 read-index 去進行讀取
                    if (status.isOk()) {
                        try {
                            //直接使用 processor 查詢資料,不通過狀态機
                            OperationResponse<T> res = (OperationResponse<T>) mdbStoreProcessor
                                    .processOperation(operation);
                            future.complete(res);
                        } catch (Throwable t) {
                            future.completeExceptionally(
                                    new IllegalStateException("Fail to read data from processor",
                                            t));
                        }
                    } else {
                        //狀态不 ok,可能是逾時,也可能是狀态機異常等其他原因
                        if (Operation.ALL_DATA == operation.getType()) {
                            //這裡判斷是不是讀取全量的資料,讀取全量資料的話,需要快速失敗,不能轉到 leader 走 raft log讀取,                            
                            //原因見 4.3
                            future.completeExceptionally(new IllegalStateException(
                                    "Fail to get all data by read-index read, status: " + status
                                            .getErrorMsg()));
                        } else {
                            //通過将本次請求轉發到 Leader 中,走 raft log,在 Leader 的狀态機中把本條 raft log 應用後,再                             
                            //傳回資料給 Follower
                            LOGGER.warn("ReadIndex read failed, status: {}, go to Leader read.",
                                    status.getErrorMsg());
                            readFromLeader(operation, future);
                        }
                    }
                }
            }
                       
        Object o = future.get(5_000L, TimeUnit.MILLISECONDS);
        if (o instanceof GrpcResponse) {
            //傳回類型的 GrpcResponse,說明本次請求是通過 Raft log 轉到 Leader 處理并傳回的,需要将資料反序列化
            return serializer
                    .deserialize(((GrpcResponse) o).getData().toByteArray(), OperationResponse.class);
        } else {
            //直接在本地通過 read-index read 讀本地記憶體
            return (OperationResponse<T>) o;
        }                   

 Follower 請求轉發 

在 SOFAJRaft 中,所有的寫請求都隻能由 Leader 節點進行處理,當寫請求落到了 Follower 中,有兩種方式進行處理。

1. 直接拒絕該請求,并将 Leader 節點的位址傳回給用戶端,讓用戶端重新發起請求。

2. 将目前請求 hold 在服務端,并将該請求轉發到 Leader 節點,Leader 節點處理完成後,将 response 傳回給 Follower,Follower 再将之前 hold 住的請求傳回給用戶端。

這裡使用第一種時,需要用戶端也進行相應的改造,為了對用戶端透明,我們選擇了第二種,通過轉發的方式,将請求轉給 Leader。

在 SOFAJRaft 中,各個節點需要通過 RPC 來進行通信,比如發送心跳,投票等。

SOFAJRaft 預設提供了兩種通信方式,一種是 sofa-bolt,還有一種是 grpc,考慮到元件的流行性,選擇了grpc來作為通信方式。在建構 server 時,使用 GrpcRaftRpcFactory 在建立 RpcServer 。然後将 SOFAJRaft 中自帶的處理器(心跳處理器,投票處理器等)注冊到 RpcServer中。這些處理器都是實作了 RpcProcessor 接口,該接口的 handleRequest 方法會處理收到的請求。

使用 GrpcRaftRpcFactory 需要注意的是,需要引入依賴。

<dependency>
    <groupId>com.alipay.sofa</groupId>
    <artifactId>rpc-grpc-impl</artifactId>
    <version>${jraft.grpc.version}</version>
</dependency>      

并且需要通過 spi 指定使用 GrpcRaftRpcFactory。

檔案路徑 /resources/META-INF.services/com.alipay.sofa.jraft.rpc.RaftRpcFactory,檔案内容 com.alipay.sofa.jraft.rpc.impl.GrpcRaftRpcFactory。

這裡,可以定義一個自己的處理器,實作 RpcProcessor 接口,将該 Processor 也注冊到 RpcServer 中,複用同一個 RpcServer。

· 建立 RpcServer 并注冊處理器

//擷取 GrpcRaftRpcFactory
        GrpcRaftRpcFactory raftRpcFactory = (GrpcRaftRpcFactory) RpcFactoryHelper.rpcFactory();
        //GrpcRequest 是自己的 Processor 通信使用,這裡使用 proto 去生成 GrpcRequest 和 GrpcResponse
        raftRpcFactory.registerProtobufSerializer(GrpcRequest.class.getName(),
                GrpcRequest.getDefaultInstance());
        raftRpcFactory.registerProtobufSerializer(GrpcResponse.class.getName(),
                GrpcResponse.getDefaultInstance());
        
        MarshallerRegistry registry = raftRpcFactory.getMarshallerRegistry();
         //注冊 GrpcRequest 對應的 response 的預設對象
        registry.registerResponseInstance(GrpcRequest.class.getName(),
                GrpcResponse.getDefaultInstance());
        //建立 GrpcServer
        final RpcServer rpcServer = raftRpcFactory.createRpcServer(peerId.getEndpoint());
         //注冊 sofa-jraft 中自帶的處理器
        RaftRpcServerFactory.addRaftRequestProcessors(rpcServer, RaftExecutor.getRaftCoreExecutor(),
                RaftExecutor.getRaftCliServiceExecutor());
        //注冊自己業務的處理器
        rpcServer.registerProcessor(new GrpcRequestProcessor(server));

        return rpcServer;      

· proto file

syntax = "proto3";
option java_multiple_files = true;
package com.xxx.mdb.store.raft.entity;

message GrpcRequest {
  //這裡的 data 儲存的就是 Operation 序列化過後的二進制流
  bytes data =1;
}

message GrpcResponse {
  //這裡的 data 儲存的是業務 Processor 處理完 Operation 過後,并且經過序列化後的二進制流
  bytes data = 1;
  //異常資訊
  string errMsg = 2;
  //标志位,請求是否 ok
  bool success = 3;
}      

· 自己的處理器,用于接收 Follower 過來的轉發請求。

//如果目前節點不是 Leader,不進行處理
        if (!jRaftServer.getNode().isLeader()) {
            return;
        }
        //定義 done,狀态機應用 Raft log 後,會回調這個 done
        FailoverClosure done = new FailoverClosure() {

            GrpcResponse data;

            Throwable ex;

            @Override
            public void setResponse(GrpcResponse data) {
                //Follwer 在狀态機中執行成功後,會将 result 封裝成 GrpcResponse,然後在這裡設定
                this.data = data;
            }

            @Override
            public void setThrowable(Throwable throwable) {
                //在異常時,會進行調用
                this.ex = throwable;
            }

            @Override
            public void run(Status status) {
                if (Objects.nonNull(ex)) {
                    LOGGER.error("execute has error", ex);
                    //ex 不為 null,說明發生了異常,将異常傳回給 Follower
                    rpcCtx.sendResponse(
                            GrpcResponse.newBuilder().setErrMsg(ex.toString()).setSuccess(false)
                                    .build());
                } else {
                    //将請求傳回 Follower
                    rpcCtx.sendResponse(data);
                }
            }
        };
        //将從 Follower 過來的請求送出給狀态機,在内部會把 request 的 data 字段給反序列化為 Operation
        jRaftServer.applyOperation(jRaftServer.getNode(), request, done);      

· Follower 中的轉發邏輯

try {
            //将 operation 序列化成 byte 數組,然後建構 GrpcRequest.
            GrpcRequest request = GrpcRequest.newBuilder()
                    .setData(ByteString.copyFrom(serializer.serialize(operation))).build();
            //從緩存擷取目前 Leader 節點的位址,如果 Leader 為空,抛出異常。這裡的 Leader 需要動态重新整理,每隔5秒中就去重新整理一次                 
            //Leader,保證 Leader 是最新的。可以通過 RouteTable#refreshLeader 去定時重新整理。
            final Endpoint leaderIp = Optional.ofNullable(getLeader())
                    .orElseThrow(() -> new IllegalStateException("Not find leader")).getEndpoint();
            //通過 grpc 将請求發送給自己的處理器
            cliClientService.getRpcClient().invokeAsync(leaderIp, request, new InvokeCallback() {
                @Override
                public void complete(Object o, Throwable ex) {
                    if (Objects.nonNull(ex)) {
                        //存在異常,将異常進行回調
                        closure.setThrowable(ex);
                        //進行 fail 的回調,回調中會将 exception 傳回給用戶端
                        closure.run(new Status(RaftError.UNKNOWN, ex.getMessage()));
                        return;
                    }
                    //将 grpc response 設定給回調類
                    closure.setResponse((GrpcResponse) o);
                    //進行 success 的回調,回調中會将資料傳回給用戶端
                    closure.run(Status.OK());
                }

                @Override
                public Executor executor() {
                    return RaftExecutor.getRaftCliServiceExecutor();
                }
            }, timeoutMillis);
        } catch (Exception e) {
            closure.setThrowable(e);
            closure.run(new Status(RaftError.UNKNOWN, e.toString()));
        }       

PART. 4

SOFAJRaft 的一些實踐

 read-index read 

 傳回資料量過大導緻 oom 

在我們的業務場景中,有一個擷取全量資料的接口,并且是通過 read-index read 去進行讀資料的。在對這個接口進行壓測時,會發現 CPU 飙高的情況,經過排查,是由于堆記憶體占滿了,GC 線程一直在 work 導緻的。經過 dump 堆記憶體後發現,是由于内部使用 Disruptor 導緻的問題,該問題目前已被我們修複,并且也已回報給社群,在 1.3.8 版本中進行了解決。

具體問題見 issue#618

 響應時間較長 

在測試同學進行壓測,發現讀取接口的最大耗時偶爾會跑到 500ms,平均響應耗時大概在 100ms 左右。經過反複排查以及閱讀代碼,最終發現這個問題和 election timeout  有關。

在 SOFAJRaft 中,election timeout 就是選舉逾時的時間,一旦超過了 election timeout,Follwer 還沒有收到 Leader 的心跳,Follower 認為目前叢集中沒有 Leader,自己發起投票來嘗試當選 Leader。

正常情況下,Leader 給 Follower 發心跳的頻率是 election timeout / 10,也就是說在 election timeout 期間内,Leader 會給 Follower 發 10 次心跳,Follower 10次都沒有收到心跳的情況下,才會發生選舉。

而恰巧的是,我設定的 election timeout 剛好就是 5s,5s / 10 剛好就是 500ms。

于是進一步分析 read-index read 的機制,當 Follower 使用 read-index read 時,首先要去 Leader 擷取 Leader 目前的 commit index,然後需要等待 Follower 自己的狀态機的 apply index 超過從 Leader 那邊擷取到的 commit index,然後才會進行 read-index read 的回調。

而 Follower 的狀态機的 apply 操作是通過 Leader 的心跳請求驅動的,Leader 中能夠知道 Raft log 是否被半數送出了,一旦某一條 Raft log 被半數送出,Leader 在下一次的心跳請求中就會發最新的 commit index 同步給 Follower,Follower 收到新的 commit index 後,才會驅動自己的狀态機去 apply Raft log。

而心跳請求的頻率又是 election timeout / 10,所有會存在 read-index read 偶爾的響應時間會是 election timeout / 10.

如何解決:

基于以上分析,将 election timeout 的時間調整為了 1s,心跳頻率也就變成了 100ms,最大的響應耗時也就變低了,平均響應耗時也降低到了 4ms 左右。

read-index read 大概邏輯如圖-5所示,

SOFAJRaft 在同程旅遊中的實踐

圖-5:read-index read 處理邏輯

大響應接口失敗後轉發請求到 

 leader 導緻狀态機阻塞 

在一次排查問題的過程中,懷疑網絡存在問題。于是聯系運維同學,運維同學對執行 tcpdump 指令,對網絡進行了抓包。

整個叢集分為 3 個機房,2+2+1 的模式進行部署,1 這個節點的網絡偶爾會存在波動。在當時執行 tcpdump 過後 4 分鐘,到1這個節點的讀請求就開始發生 read-index timeout 了,而當時的邏輯是,隻要 read-index read 回調狀态不 ok,就将該請求轉發到 Leader,走 Raft log 來進行處理。

這裡存在一個接口,是去讀所有的資料,資料量比較大。當 read-index read 逾時時,會将這個請求轉發到了 Leader 節點,走 Raft log 去讀資料,走 Raft log 就會在狀态機中去進行處理,而這個請求的 response 比較大,導緻在擷取完資料後,去序列化資料時比較耗時,大概需要消耗 1500ms,狀态機中處理 Raft log 的吞吐就降低了。并且 Raft log 是會從 Leader 複制給 Follower 的,也就是說,Follower 的狀态機也會去執行這個耗時 1500 ms的 Raft log,隻是 Follower 不對 response 做處理而已。

在上面描述了 read-index read 的邏輯,Follower 要執行 read-index read,需要狀态機的 apply-index 追上 Leader 的 commit index,當發生上述網絡波動時,這個大接口走 Raft log 的方式,降低了狀态機處理 Raft log 的吞吐,導緻 Follwer 的 apply index 更難追上 Leader 的 commit index 了。

是以陷入了惡性循環,這個大接口一緻通過 Raft log 轉向 Leader 去讀取資料,而這個 Raft log 處理非常耗時。

最終導緻狀态機的 apply index 遠遠小于 commit index,所有的用戶端的讀操作和寫操作全部都逾時。

将這個大接口的讀取操作改成快速失敗,一旦 read-index read 的回調不成功,不把請求通過 Raft log 轉到 Leader 去,直接傳回異常給用戶端,讓用戶端重試。

 snapshot 操作時,

 阻塞狀态機應用 Raft log,導緻響應逾時 

系統在壓測時,跑着跑着用戶端偶爾會逾時。經過反複排查,發現逾時的時間點和 snapshot 的時間點重合。根據閱讀代碼發現,狀态機的 apply 操作和 snapshot 操作預設是同步的,而 snapshot 比較耗時,導緻了狀态機 apply Raft log 時間被延長了,進而用戶端請求逾時。

在 snapshot 時,将 snapshot 的操作變為異步操作,使用 copy on write 把 snapshot 時的記憶體資料 copy 了一份,再異步進行持久化處理。

這裡需要注意的是,copy on write 會消耗 2 倍的記憶體,這裡需要確定不要導緻 OOM 了。不同的場景需要考慮不同的異步 snapshot 的方式。

 Raft中存在 Raft log 和 snapshot file,

 需要檔案系統保證有狀态 

SOFAJRaft 需要儲存 Raft log 以及 snapshot file。

在容器部署時,需要確定應用使用的 Raft 目錄是持久化的。

 開啟 metrics 以及 

 利用 kill -s SIGUSR2 幫助問題分析 

在 SOFAJRaft 中,存在 node 參數 enableMetrics,是否開啟 metrics 統計名額資料。

我們将它打開,并且将名額資料輸出到一個單獨的日志檔案,歸檔的日志可以在分析問題時提供線索。

比如:有時候的讀取請求響應時間增大了,就可以通過觀察名額資料 read-index 來幫助分析是否是線性讀的機制導緻請求響應飙升。

将名額輸出到日志檔案:

Node node = ...
NodeOptions nodeOpts =  ...
//打開監控名額
nodeOpts.setEnableMetrics(true);
node.init(nodeOpts);

Slf4jReporter reporter = Slf4jReporter
         .forRegistry(node.getNodeMetrics().getMetricRegistry())
         //擷取到日志的輸出對象
         .outputTo(LoggerFactory.getLogger("com.jraft.metrics"))
         .convertRatesTo(TimeUnit.SECONDS)
         .convertDurationsTo(TimeUnit.MILLISECONDS)
         .build();
reporter.start(30, TimeUnit.SECONDS);      

除此之外,還可以利用  kill - s SIGUSR2 pid 給 SOFAJRaft 程序發送信号量,程序收到信号量後,會在程序的啟動目錄中生成名額資料資料檔案。

這裡我個人比較關注 node_describe.log 中 log manager 的 diskId 和 appliedId,前者是 Raft log 寫到磁盤中的位置,後者是狀态機目前應用到 Raft log 的位置,可以通過對比這兩個資料,用來觀察狀态機的吞吐是否正常,一旦兩者相差很多,說明狀态機出問題了。

「後續演進」

· 引入 Learner 節點,增加整個叢集的讀吞吐量。

· 持續關注社群,和社群共同發展。

- END -

以上就是 SOFAJRaft 在我們公司内的使用分享,有問題的小夥伴可以找到我的 GitHub 直接郵箱和我溝通。

感謝 SOFAStack 提供的一個如此優秀的 Java 架構。 

-

「作 者」

「參 考」

· alibaba Nacos 中關于 SOFAJRaft 的使用 

· JRaft-rheakv 中關于 SOFAJRaft 的使用