天天看點

利用Rust語言+Actix庫實作的"Ring Benchmark"

什麼是Ring Benchmark?

如果在網上搜尋"Ring Benchmark",大多數結果顯示,這個短語出自《Programming Erlang: Software for a Concurrent World》這本書8.11.2這一小節的某個練習題。雖然我沒有找到這本書,但是并不影響該我們大概了解它的含義:建立N個節點,首尾相連構成一個圈,然後在這個圈子裡某個節點開始發送消息,使得消息在這個圈裡面循環M圈,看看時間是多少。對應的英文描述為:

// Write a ring benchmark. Create N processes in a ring. Send a
// message round the ring M times so that a total of N * M messages
// get sent. Time how long this takes for different values of N and M.
           

這個例子原始應該是使用Erlang這個語言來實作的,想說明的意思也很清晰:我這個Erlang語言寫這種模式的問題真的是又快又好。但是,這個例子同樣可以使用rust來實作,其實作來自于官方給出的example,但是并沒有相關解析,這篇日志我們就來看一下其實作過程。

Artix架構

Artix是一款基于rust開發的Actor系統架構,看上去是比較成熟的Actor系統之一,并且根據目前的英文wiki來看,這款系統還是更新時間比較近的一款。是以,這裡的Ring Benchmark實作也是利用的artix架構。關于這個庫具體的介紹可以在crates.io中搜尋Artix首頁。

依賴庫說明

針對toml檔案,這裡非常重要,因為Artix大量依賴一個叫Future的庫,而這個Future庫在19年上半年發生了大版本的更新,從v0.1直接更新到v0.3,是的,檢視曆史版本發現,v0.2全都被舍棄了。這就導緻一些原本穩定的寫法變得不再适用,我草。盡管如此,寫成如下這樣是沒問題的,我們用Future中v0.1的最後一個版本。

[dependencies]
actix = "0.8"
futures = "0.1.29"
           

對應的,我們需要在main.rs中列寫如下的頭檔案。actix就是上面說的架構,env用于實作運作程式時輸入環境變量的讀取,SystemTime是為了計時。

use actix::*;
use std::env;
use std::time::SystemTime;
           
定義傳遞的資訊類型

這裡定義傳遞的資訊類型是一個叫Payload的結構體,其中包含了一個usize類型用于記錄被傳遞的次數,非常清晰。實際上,rust中任何類型都可以被用來當做message,我們需要做的就是為你定義的類型額外實作Message這個Trait即可。是以,這裡為Payload寫一個實作,其中隻有一句話:使用type關鍵字建構類型别名,即令Result為一個空量。這句話說明收到Payload消息的Actor需要傳回一個Result類型的值,這裡對應傳回一個空值。

struct Payload(usize);

impl Message for Payload {
    type Result = ();
}
           
定義合格的“演員”

所謂的Actor,在這個Actix架構中需要分别實作1)結構的定義、2)Trait的實作以及3)Handler的實作。

首先,這裡定義結構體Node作為Actor主體,其中包含目前Actor的ID、消息總共需要傳遞的上限,以及下一個Actor的位址。這裡需要注意的是,它使用了一個叫做Recipient的結構體,實際上Recipient是一種特殊形式的位址,比如這裡它允許向這個位址對應的Actor傳遞僅此Payload一種形式的Message。

接下來,針對Node實作Actor的Trait屬于固定句式,其中定義了Context<Self>這樣的變量别名Context。這個Context是很重要的,它對應的變量(通常為ctx)可以實作Actor各種參數設定,例如Mailbox隊列長度修改、自身位址查詢、Actor自我停止等。另外,如果采用同步形式的循環,可以修改為SyncContext。

struct Node {
    id: usize,
    limit: usize,
    next: Recipient<Payload>,
}

impl Actor for Node {
    type Context = Context<Self>;
}
           

最後,Handler實際上是Actor接收到Message之後執行的内容,這裡為Node變量(也就是我們的Actor主體)實作接收到Payload這個Message之後需要進行的Handler。關于result的别名定義同Message中定義的保持相同。而handle函數的變量聲明signature基本是固定的:self的可變引用、資訊類型的msg、Context<Self>的可變引用(這裡不涉及參數修改之類的使用,以通用符替代)。

在具體功能方面,無論哪個圓環上哪個節點接收到了資訊,都要确認一下是否需要停止(消息轉夠了次數),這就是為什麼我們在每個Actor裡面都配備了limit上限這個變量。如果達到了,我們把整個運作系統關閉,就可以停止了。另一個要做的事情,是向下一個自己位址庫的下一個位址再次發送這個Payload,并對裡面的計數器加一。這裡我們使用位址類型裡面do_send這個函數,實際上是比較激進的,因為這個send是直接bypass接收人mailbox的存在。

impl Handler<Payload> for Node {
    type Result = ();

    fn handle(&mut self, msg: Payload, _: &mut Context<Self>) {
        if msg.0 >= self.limit {
            println!(
                "Actor {} reached limit of {} (payload was {})",
                self.id, self.limit, msg.0
            );
            System::current().stop();
            return;
        }
       
        self.next
            .do_send(Payload(msg.0 + 1))
            .expect("Unable to send payload");
    }
}
           
如何扣動“扳機”

你看,基本的Actor配置就是上面那樣,實際上我們制定了規則,而規則對于每一個Actor都是一視同仁的。這将是多麼美好的一個世界。現在隻需要伸出手,扣動扳機即可(運作main函數)。

作為main函數,有一些必要的事情需要我們完成,比如在指令行運作程式的時候,額外讀取兩個變量,分别為節點個數n_nodes ,以及message傳遞多少圈n_times。相關部分如下所示,這裡為了表述清晰,我們把main函數切分為二,請不要責怪我“丢失了後半個}”。

這段代碼其實很清晰,我隻是用collect這個函數,從指令行裡面讀取了一些string類型的資料,然後用parse這個函數将string轉化為usize類型,而已。那麼我們怎麼使用獲得的n_nodes和n_times呢,乘一下就可以了…這就是我們message總共需要傳遞的次數。

// main
fn main() -> std::io::Result<()> {   
    let args: Vec<String> = env::args().collect();

    if args.len() < 3 {
        print_usage_and_exit();
    }

    let n_nodes = if let Ok(arg_num_nodes) = args[1].parse::<usize>() {
        if arg_num_nodes <= 1 {
            eprintln!("Number of nodes must be > 1");
            ::std::process::exit(1);
        }
        arg_num_nodes
    } else {
        print_usage_and_exit();
    };

    let n_times = if let Ok(arg_ntimes) = args[2].parse::<usize>() {
        arg_ntimes
    } else {
        print_usage_and_exit()
    };
    
    let limit = n_nodes * n_times;
           

現在迎來我們的重頭戲,如何觸發我們的“扳機”呢。實際上這部分短小精悍,一個閉包就搞定了。注意,我們即可以用Actor::start()開始Actor的運作,也可以用 Actor::create()開始Actor的運作。差別在于,start可以使得actor立刻運作,而creat通常适用于我們需要事先擷取context目标的情況,也就是說兩者的内部函數結構不同,這裡外部使用creat,内部使用start。

這個閉包輸入是一個叫ctx的變量,研究creat這個函數内部可以發現,這個ctx實際上是通過Context::new()而生成的Context<Self>類型,而這個context屬于node變量。對應的ctx.address實際上node這個節點的位址(這裡實際上是最後一個節點),這裡将其綁定給了first_addr,然後又給了id為1的節點的next變量。

之後,id為1的節點通過start運作,獲得自己的context。與此同時,prev_addr獲得了第一個節點的位址。接着,我們運作第2、3…直到最後一個之前的節點。最後一個最為不同,它需要被閉包傳回。實際上在一般的start中,我們隻需要執行Node結構建構和run就好了,但是在creat裡面,我們額外實作了點與點之間的串聯,是以才搞得比較晦澀。

let system = System::new("ring");

    let node = Node::create(move |ctx| {
        let first_addr = ctx.address();
        let mut prev_addr = Node {
            id: 1,
            limit,
            next: first_addr.recipient(),
        }
        .start();

        for id in 2..n_nodes {
            prev_addr = Node {
                id,
                limit,
                next: prev_addr.recipient(),
            }
            .start();
        }

        Node {
            id: n_nodes,
            limit,
            next: prev_addr.recipient(),
        }
    });

           

為什麼我們說Node傳回的是最後一個節點呢?讓我們運作程式,就可以看到如下輸出(相關的輸出我在代碼裡删掉了,因為它不影響正常邏輯),可以看到,id1的actor是最先激活的,而最後的id3的actor是最後激活的,這也印證了我們上面的邏輯。

C:\Users\xxx\RustProjects\xxx>cargo run 3 2
   Compiling xxx v0.1.0 (C:\Users\xxx\RustProjects\xxx)
    Finished dev [unoptimized + debuginfo] target(s) in 5.94s
     Running `target\debug\xxx.exe 3 2`
Setting up 3 nodes
Sending start message and waiting for termination after 6 messages...
Actor 1 is alive
Actor 2 is alive
Actor 3 is alive
           

下面這部分就好了解了,僅僅是在對node發送第一個send,并捎帶手在執行下一次發送的時候,實作了計時功能而已。也就是說,實際上是最後一個id的節點執行了發送。對了,注意system.run()實際上被用來判斷,上面的Actor循環有沒有運作完。

println!(
        "Sending start message and waiting for termination after {} messages...",
        limit
    );

    let now = SystemTime::now();

    let _req = node.send(Payload(1));

    match system.run() {
        Ok(_) => println!("Completed"),
        Err(e) => println!("An error occured: {:?}", e),
    }

    match now.elapsed() {
        Ok(elapsed) => println!(
            "Time taken: {}.{:06} seconds",
            elapsed.as_secs(),
            elapsed.subsec_micros(),
        ),
        Err(e) => println!("An error occured: {:?}", e),
    }

    Ok(())
}
           

最後,補上main函數裡面用到的一個報錯說明,因為這個程式是要在運作的時候輸入兩個變量的。

// Info
fn print_usage_and_exit() -> ! {
    eprintln!("Usage; actix-test <num-nodes> <num-times-message-around-ring>");
    ::std::process::exit(1);
}
           
小結

好的,事已至此,你隻需要将以上的代碼塊合并起來,就可以得到完整的main.rs檔案了。最後,我們給出全部的輸出結果。可以看到,激活過程中,順序是1、2、3,而發送message的過程中,順序是3、2、1,因為每一個節點的next存的,都是前一個節點,并且我們從最後一個node開始發送。

C:\Users\xxx\RustProjects\xxx>cargo run 3 2
   Compiling xxx v0.1.0 (C:\Users\xxx\RustProjects\xxx)
    Finished dev [unoptimized + debuginfo] target(s) in 5.94s
     Running `target\debug\xxx.exe 3 2`
Setting up 3 nodes
Sending start message and waiting for termination after 6 messages...
Actor 1 is alive
Actor 2 is alive
Actor 3 is alive
Actor 3 received message 1 of 6 (16.67%)
Actor 2 received message 2 of 6 (33.33%)
Actor 1 received message 3 of 6 (50.00%)
Actor 3 received message 4 of 6 (66.67%)
Actor 2 received message 5 of 6 (83.33%)
Actor 1 reached limit of 6 (payload was 6)
Completed
Time taken: 0.010000 seconds
           
參考文獻

Axtic Quickstart

actix/examples/ring代碼