天天看點

raft-rs 示例程式源碼解讀

作者:LeoYang90

前言

raft-rs 的 5 節點示例程式稍微比較複雜一些,但是看懂的話,就會對 raft 的使用得心應手。

示例程式

Node 結構體

struct Node {
    // None if the raft is not initialized.
    raft_group: Option<RawNode<MemStorage>>,
    my_mailbox: Receiver<Message>,
    mailboxes: HashMap<u64, Sender<Message>>,
    // Key-value pairs after applied. `MemStorage` only contains raft logs,
    // so we need an additional storage engine.
    kv_pairs: HashMap<u16, String>,
}      

示例程式中,Node 是使用 RAFT 的外部應用,代表 RAFT 的一個節點應用程式,其中 raft_group 就是上一篇文章所說的 RawNode,是 RAFT 對外的接口,也就是 Node 節點内部運作的 RAFT;

my_mailbox 是 Node 接受其他 Node 資訊的視窗,mailboxes 是 Node 發送給其他 Node 資訊的視窗;

kv_pairs 是 request 最後 apply 的結果。

應用啟動

fn main() {
    const NUM_NODES: u32 = 5;
    // Create 5 mailboxes to send/receive messages. Every node holds a `Receiver` to receive
    // messages from others, and uses the respective `Sender` to send messages to others.
    let (mut tx_vec, mut rx_vec) = (Vec::new(), Vec::new());
    for _ in 0..NUM_NODES {
        let (tx, rx) = mpsc::channel();
        tx_vec.push(tx);
        rx_vec.push(rx);
    }

    // A global pending proposals queue. New proposals will be pushed back into the queue, and
    // after it's committed by the raft cluster, it will be poped from the queue.
    let proposals = Arc::new(Mutex::new(VecDeque::<Proposal>::new()));

    for (i, rx) in rx_vec.into_iter().enumerate() {
        // A map[peer_id -> sender]. In the example we create 5 nodes, with ids in [1, 5].
        let mailboxes = (1..6u64).zip(tx_vec.iter().cloned()).collect();
        let mut node = match i {
            // Peer 1 is the leader.
            0 => Node::create_raft_leader(1, rx, mailboxes, &logger),
            // Other peers are followers.
            _ => Node::create_raft_follower(rx, mailboxes),
        };
  }

  // Propose some conf changes so that followers can be initialized.
  add_all_followers(proposals.as_ref());
  
  ...
}      

從上述代碼可以看到,示例程式先建立了 5 對 channel,這些 channel 是示例程式模拟真實應用的 transport 接口。

在建立 RAFT 5 個 node 節點的時候,每個 node 節點都會選擇 5 對 channel 其中一個接收端作為自己的 my_mailbox,作為接收視窗,接收其他 peer node 節點的 msg。

然後複制全部其他的 5 個 channel 的發送端,作為 Node 節點的發送視窗,每個發送端對應一個 peer node 節點,向這些 channel 發送端發送 message,相應的 peer node 節點的 channel 接收端就會接收到消息。

create_raft

fn create_raft_leader(r
        id: u64,
        my_mailbox: Receiver<Message>,
        mailboxes: HashMap<u64, Sender<Message>>,
        logger: &slog::Logger,
    ) -> Self {
        let mut cfg = example_config();
        cfg.id = id;
        
        let mut s = Snapshot::default();
        let storage = MemStorage::new();
        storage.wl().apply_snapshot(s).unwrap();
        let raft_group = Some(RawNode::new(&cfg, storage, &logger).unwrap());
        Node {
            raft_group,
            my_mailbox,
            mailboxes,
            kv_pairs: Default::default(),
        }
    }
    
fn create_raft_follower(
        my_mailbox: Receiver<Message>,
        mailboxes: HashMap<u64, Sender<Message>>,
    ) -> Self {
        Node {
            raft_group: None,
            my_mailbox,
            mailboxes,
            kv_pairs: Default::default(),
        }
    }      

示例中 create_raft_follower 并沒有建立 RAFT,而是等待新消息進入再建立,這個我們後面再說。

現在我們成功建立了 Leader 節點和 4 個 Follower 節點,但是 Follower 節點上面并沒有運作 RAFT 程式,也就是說現在 RAFT 叢集現在隻有 Leader 節點一個,其他 Follower 節點上面沒有運作 RAFT 子產品。

接下來,我們需要要求 Leader 節點的 RAFT 程式提出配置變更要求,方法就是調用 propose 接口,并且傳入 ConfChange-AddNode 的 Msg 參數:

fn add_all_followers(proposals: &Mutex<VecDeque<Proposal>>) {
    for i in 2..6u64 {
        let mut conf_change = ConfChange::default();
        conf_change.node_id = i;
        conf_change.set_change_type(ConfChangeType::AddNode);
        loop {
            let (proposal, rx) = Proposal::conf_change(&conf_change);
            proposals.lock().unwrap().push_back(proposal);
            if rx.recv().unwrap() {
                break;
            }
            thread::sleep(Duration::from_millis(100));
        }
    }
}      

propose_conf_change

Leader 節點将 AddNode 的請求發送給内部 RAFT 程式,調用了 propose_conf_change 接口:

fn main() {
    ...
    if raft_group.raft.state == StateRole::Leader {
      // Handle new proposals.
      let mut proposals = proposals.lock().unwrap();
      for p in proposals.iter_mut().skip_while(|p| p.proposed > 0) {
        propose(raft_group, p);
      }
    }
    ...
}

fn propose(raft_group: &mut RawNode<MemStorage>, proposal: &mut Proposal) {
    let last_index1 = raft_group.raft.raft_log.last_index() + 1;
    ...
    } else if let Some(ref cc) = proposal.conf_change {
        let _ = raft_group.propose_conf_change(vec![], cc.clone());
    }
    ...

    let last_index2 = raft_group.raft.raft_log.last_index() + 1;
    if last_index2 == last_index1 {
        // Propose failed, don't forget to respond to the client.
        proposal.propose_success.send(false).unwrap();
    } else {
        proposal.proposed = last_index1;
    }
}      

ready

Leader 節點調用 propose_conf_change 後,就需要調用 ready 函數等待内部 RAFT 程式處理 Msg 完成。

值得注意的是,一般來說,如何處理 ready 函數傳回的 Ready 結構體是示例應用程式的關鍵:

fn main() {
    ...
    // Handle readies from the raft.
    on_ready(
      raft_group,
      &mut node.kv_pairs,
      &node.mailboxes,
      &proposals,
      &logger,
    );
    ...
}

fn on_ready(
    raft_group: &mut RawNode<MemStorage>,
    kv_pairs: &mut HashMap<u16, String>,
    mailboxes: &HashMap<u64, Sender<Message>>,
    proposals: &Mutex<VecDeque<Proposal>>,
    logger: &slog::Logger,
) {
    if !raft_group.has_ready() {
        return;
    }
    let store = raft_group.raft.raft_log.store.clone();

    // Get the `Ready` with `RawNode::ready` interface.
    let mut ready = raft_group.ready();

    ...
    if !ready.messages().is_empty() {
        // Send out the messages come from the node.
        handle_messages(ready.take_messages());
    }

    // Apply the snapshot. It's necessary because in `RawNode::advance` we stabilize the snapshot.
    if *ready.snapshot() != Snapshot::default() {
        let s = ready.snapshot().clone();
        if let Err(e) = store.wl().apply_snapshot(s) {
            ...
        }
    }

    ...
    // Apply all committed entries.
    handle_committed_entries(raft_group, ready.take_committed_entries());

    // Persistent raft logs. It's necessary because in `RawNode::advance` we stabilize
    // raft logs to the latest position.
    if let Err(e) = store.wl().append(ready.entries()) {
        ...
    }

    if let Some(hs) = ready.hs() {
        // Raft HardState changed, and we need to persist it.
        store.wl().set_hardstate(hs.clone());
    }

    if !ready.persisted_messages().is_empty() {
        // Send out the persisted messages come from the node.
        handle_messages(ready.take_persisted_messages());
    }

    // Call `RawNode::advance` interface to update position flags in the raft.
    let mut light_rd = raft_group.advance(ready);
    // Update commit index.
    if let Some(commit) = light_rd.commit_index() {
        store.wl().mut_hard_state().set_commit(commit);
    }
    // Send out the messages.
    handle_messages(light_rd.take_messages());
    // Apply all committed entries.
    handle_committed_entries(raft_group, light_rd.take_committed_entries());
    // Advance the apply index.
    raft_group.advance_apply();
}      
  • 調用 has_ready 函數來判斷内部 RAFT 子產品是否處理資訊完畢

<!---->

  • 調用 ready 函數擷取 Ready 結構體

<!---->

  • 擷取 Ready 結構體内部的 messages 資訊,并且調用 handle_message 函數将 Msg 發送到其他 peer node 節點。值得注意的是,按照論文來說此時隻有 Leader 才可以并行執行 Msg 發送和日志落盤。是以隻有 Leader 節點調用 ready.messages().is_empty() 才是 false,Follower 都是 true。

<!---->

  • 擷取 Ready 結構體的 snapshot,并将其應用到 RAFT 日志裡面去,落盤 snapshot

<!---->

  • 擷取 Ready 結構體已經 Committed 的 Log Entries,也就是經過大多數節點确認的消息,此時因為隻有 Leader 才運作 RAFT 子產品,是以 conf_change 這個 Log Entries 已經是 Committed Entries 的了。這時候,應用需要 apply 這些 log entries

<!---->

  • 擷取 Ready 結構體的普通 Log Entries,落盤處理

<!---->

  • 如果 hardstate 有變化,那麼需要落盤日志

<!---->

  • 由于日志已經落盤,可以擷取 Ready 結構體的 Msg 資訊發送到其他 peer node 節點,take_persisted_messages 是對 Follower 的節點起作用的,因為 Leader 已經在落盤前并行發送了 Msg

<!---->

  • 調用 advance 接口,更新 RAFT 子產品的狀态

<!---->

  • advance 接口會傳回新的 commit index,應用需要持久化到日志磁盤

<!---->

  • 由于調用 ready 和 advance 接口之間不允許調用 step、propose、campaign 等等接口,是以對于 follower 來說,light_rd.take_messages 肯定傳回空。但是對于 leader 來說,由于 ready 後又落盤了一些 Entries,如果這些 Entries 已經收到 大多數 peer node 的 msgAppendRespone,那麼 commit index 也要推進到這些 Entries 的 last index,并且需要将新的 commit index 資訊發送給 followers

<!---->

  • advance 接口會傳回新的 committed entries,應用需要繼續 apply 這些 entries

<!---->

  • 更新 RAFT 子產品的 apply index

其中,handle_messages 邏輯很簡單,就是輪詢各個 channel 的發送端,将消息發送到相應的 peer node:

let handle_messages = |msgs: Vec<Message>| {
  for msg in msgs {
    let to = msg.to;
    if mailboxes[&to].send(msg).is_err() {
      ...
    }
  }
};      

其中處理 committed entries 的邏輯也很簡單,

  • 如果 entries 的類型是 confChange 的話,就調用 RAFT 的 apply_conf_change 函數,并且落盤到日志磁盤中。因為 raft-rs 的 joint consensus 是需要 conf change entries 在 commit 後才起作用的,必須調用 apply_conf_change 函數才能進行真正的配置變更。
  • 如果 entries 的類型是普通類型的 entries 的話,就存儲到 kv_pairs 當中去。
let mut handle_committed_entries =
        |rn: &mut RawNode<MemStorage>, committed_entries: Vec<Entry>| {
          for entry in committed_entries {
            if entry.data.is_empty() {
              // From new elected leaders.
              continue;
            }
            if let EntryType::EntryConfChange = entry.get_entry_type() {
              // For conf change messages, make them effective.
              let mut cc = ConfChange::default();
              cc.merge_from_bytes(&entry.data).unwrap();
              let cs = rn.apply_conf_change(&cc).unwrap();
              store.wl().set_conf_state(cs);
            } else {
              // For normal proposals, extract the key-value pair and then
              // insert them into the kv engine.
              let data = str::from_utf8(&entry.data).unwrap();
              let reg = Regex::new("put ([0-9]+) (.+)").unwrap();
              if let Some(caps) = reg.captures(data) {
                kv_pairs.insert(caps[1].parse().unwrap(), caps[2].to_string());
              }
            }
            if rn.raft.state == StateRole::Leader {
              // The leader should response to the clients, tell them if their proposals
              // succeeded or not.
              let proposal = proposals.lock().unwrap().pop_front().unwrap();
              proposal.propose_success.send(true).unwrap();
            }
          }
};      

step

當 Leader 調用 handle_messages 函數将 msg 發送給 followers 的 channel 發送端後,followers 的 channel 接收端就會收到消息:

fn main() {
    ...
    let handle = thread::spawn(move || loop {
      thread::sleep(Duration::from_millis(10));
      loop {
        // Step raft messages.
        match node.my_mailbox.try_recv() {
          Ok(msg) => node.step(msg, &logger),
          Err(TryRecvError::Empty) => break,
          Err(TryRecvError::Disconnected) => return,
        }
      }
      ...
}
      
fn step(&mut self, msg: Message, logger: &slog::Logger) {
  if self.raft_group.is_none() {
    if is_initial_msg(&msg) {
      self.initialize_raft_from_message(&msg, logger);
    } else {
      return;
    }
  }
  let raft_group = self.raft_group.as_mut().unwrap();
  let _ = raft_group.step(msg);
}      

由于 follower 在啟動的時候并沒有建立 RAFT 子產品,是以 raft_group 是空的,這時候就會調用 initialize_raft_from_message:

fn initialize_raft_from_message(&mut self, msg: &Message, logger: &slog::Logger) {
  if !is_initial_msg(msg) {
    return;
  }
  let mut cfg = example_config();
  cfg.id = msg.to;
  let logger = logger.new(o!("tag" => format!("peer_{}", msg.to)));
  let storage = MemStorage::new();
  self.raft_group = Some(RawNode::new(&cfg, storage, &logger).unwrap());
}

fn is_initial_msg(msg: &Message) -> bool {
    let msg_type = msg.get_msg_type();
    msg_type == MessageType::MsgRequestVote
        || msg_type == MessageType::MsgRequestPreVote
        || (msg_type == MessageType::MsgHeartbeat && msg.commit == 0)
}      

直到這個時候,Leader 和 follower 才組成 5 節點的 RAFT 叢集。

propose

示例程式造出來了 100 個請求,并且讓 Leader node 通過 propose 函數發送給内部的 RAFT 子產品。

接下來,Leader node 的 on_ready 函數就會接收到 RAFT 子產品的 Ready 結構體,解析後發送相關的 msgAppend 給 followers 的 mailboxs

followers node 通過 my_mailbox 接收到請求後,會調用 step 函數傳入 follower node 内部的 RAFT 子產品。

followers node 通過 on_ready 函數接收到 RAFT 子產品的 Ready 結構體,解析後發送 msgAppendRespone 給 leader node 的 mailbox

Leader node 的 my_mailbox 接收到請求後,繼續調用 step 将消息傳入 leader 的 RAFT,RAFT 解析 msgAppendRespone 中的 index,并且更新其 committed index

leader node 的 on_ready 函數接收到 RAFT 子產品的 Ready 結構體,分析出其中的 committed entries,将其存儲到 kv_pairs,并且傳回給用戶端成功。接着還會發送 message 給 followers 的 mailboxs 最新的 commit index

followers 通過 my_maibox 收到消息後,繼續調用 step 函數傳入 RAFT,RAFT 子產品根據 message 更新自身的 commit index

followers 調用 on_ready 函數解析出 committed entries,将其存儲到自身的 kv_pairs。

至此,5 個節點的 kv_pairs 都含有使用者請求的 data 資料。

fn main() {
    let handle = thread::spawn(move || loop {
      if raft_group.raft.state == StateRole::Leader {
        // Handle new proposals.
        let mut proposals = proposals.lock().unwrap();
        for p in proposals.iter_mut().skip_while(|p| p.proposed > 0) {
          propose(raft_group, p);
        }
      }
    }
    ...
      
    (0..100u16)
      .filter(|i| {
        let (proposal, rx) = Proposal::normal(*i, "hello, world".to_owned());
        proposals.lock().unwrap().push_back(proposal);
        // After we got a response from `rx`, we can assume the put succeeded and following
        // `get` operations can find the key-value pair.
        rx.recv().unwrap()
      })
      .count();
      
      ...
}
  
fn propose(raft_group: &mut RawNode<MemStorage>, proposal: &mut Proposal) {
    ...
    if let Some((ref key, ref value)) = proposal.normal {
        let data = format!("put {} {}", key, value).into_bytes();
        let _ = raft_group.propose(vec![], data);
    }
    ...
}      

Tick

tick 比較簡單,直接調用 RAFT 子產品的 tick 接口即可。

fn main() {
  ...
  for (i, rx) in rx_vec.into_iter().enumerate() {
    ...
    let handle = thread::spawn(move || loop {
      ...
       let raft_group = match node.raft_group {
         Some(ref mut r) => r,
         // When Node::raft_group is `None` it means the node is not initialized.
         _ => continue,
      };
      ...
      if t.elapsed() >= Duration::from_millis(100) {
        // Tick the raft.
        raft_group.tick();
        t = Instant::now();
      }
      ...
    }
  }
}      

繼續閱讀