天天看点

利用Rust语言+Actix库实现的"Ring Benchmark"

什么是Ring Benchmark?

如果在网上搜索"Ring Benchmark",大多数结果显示,这个短语出自《Programming Erlang: Software for a Concurrent World》这本书8.11.2这一小节的某个练习题。虽然我没有找到这本书,但是并不影响该我们大概理解它的含义:创建N个节点,首尾相连构成一个圈,然后在这个圈子里某个节点开始发送消息,使得消息在这个圈里面循环M圈,看看时间是多少。对应的英文描述为:

// Write a ring benchmark. Create N processes in a ring. Send a
// message round the ring M times so that a total of N * M messages
// get sent. Time how long this takes for different values of N and M.
           

这个例子原始应该是使用Erlang这个语言来实现的,想说明的意思也很清晰:我这个Erlang语言写这种模式的问题真的是又快又好。但是,这个例子同样可以使用rust来实现,其实现来自于官方给出的example,但是并没有相关解析,这篇日志我们就来看一下其实现过程。

Artix框架

Artix是一款基于rust开发的Actor系统框架,看上去是比较成熟的Actor系统之一,并且根据目前的英文wiki来看,这款系统还是更新时间比较近的一款。因此,这里的Ring Benchmark实现也是利用的artix框架。关于这个库具体的介绍可以在crates.io中搜索Artix主页。

依赖库说明

针对toml文件,这里非常重要,因为Artix大量依赖一个叫Future的库,而这个Future库在19年上半年发生了大版本的更新,从v0.1直接更新到v0.3,是的,查看历史版本发现,v0.2全都被舍弃了。这就导致一些原本稳定的写法变得不再适用,我草。尽管如此,写成如下这样是没问题的,我们用Future中v0.1的最后一个版本。

[dependencies]
actix = "0.8"
futures = "0.1.29"
           

对应的,我们需要在main.rs中列写如下的头文件。actix就是上面说的框架,env用于实现运行程序时输入环境变量的读取,SystemTime是为了计时。

use actix::*;
use std::env;
use std::time::SystemTime;
           
定义传递的信息类型

这里定义传递的信息类型是一个叫Payload的结构体,其中包含了一个usize类型用于记录被传递的次数,非常清晰。实际上,rust中任何类型都可以被用来当做message,我们需要做的就是为你定义的类型额外实现Message这个Trait即可。所以,这里为Payload写一个实现,其中只有一句话:使用type关键字构建类型别名,即令Result为一个空量。这句话说明收到Payload消息的Actor需要返回一个Result类型的值,这里对应返回一个空值。

struct Payload(usize);

impl Message for Payload {
    type Result = ();
}
           
定义合格的“演员”

所谓的Actor,在这个Actix框架中需要分别实现1)结构的定义、2)Trait的实现以及3)Handler的实现。

首先,这里定义结构体Node作为Actor主体,其中包含当前Actor的ID、消息总共需要传递的上限,以及下一个Actor的地址。这里需要注意的是,它使用了一个叫做Recipient的结构体,实际上Recipient是一种特殊形式的地址,比如这里它允许向这个地址对应的Actor传递仅此Payload一种形式的Message。

接下来,针对Node实现Actor的Trait属于固定句式,其中定义了Context<Self>这样的变量别名Context。这个Context是很重要的,它对应的变量(通常为ctx)可以实现Actor各种参数设置,例如Mailbox队列长度修改、自身地址查询、Actor自我停止等。另外,如果采用同步形式的循环,可以修改为SyncContext。

struct Node {
    id: usize,
    limit: usize,
    next: Recipient<Payload>,
}

impl Actor for Node {
    type Context = Context<Self>;
}
           

最后,Handler实际上是Actor接收到Message之后执行的内容,这里为Node变量(也就是我们的Actor主体)实现接收到Payload这个Message之后需要进行的Handler。关于result的别名定义同Message中定义的保持相同。而handle函数的变量声明signature基本是固定的:self的可变引用、信息类型的msg、Context<Self>的可变引用(这里不涉及参数修改之类的使用,以通用符替代)。

在具体功能方面,无论哪个圆环上哪个节点接收到了信息,都要确认一下是否需要停止(消息转够了次数),这就是为什么我们在每个Actor里面都配备了limit上限这个变量。如果达到了,我们把整个运行系统关闭,就可以停止了。另一个要做的事情,是向下一个自己地址库的下一个地址再次发送这个Payload,并对里面的计数器加一。这里我们使用地址类型里面do_send这个函数,实际上是比较激进的,因为这个send是直接bypass接收人mailbox的存在。

impl Handler<Payload> for Node {
    type Result = ();

    fn handle(&mut self, msg: Payload, _: &mut Context<Self>) {
        if msg.0 >= self.limit {
            println!(
                "Actor {} reached limit of {} (payload was {})",
                self.id, self.limit, msg.0
            );
            System::current().stop();
            return;
        }
       
        self.next
            .do_send(Payload(msg.0 + 1))
            .expect("Unable to send payload");
    }
}
           
如何扣动“扳机”

你看,基本的Actor配置就是上面那样,实际上我们制定了规则,而规则对于每一个Actor都是一视同仁的。这将是多么美好的一个世界。现在只需要伸出手,扣动扳机即可(运行main函数)。

作为main函数,有一些必要的事情需要我们完成,比如在命令行运行程序的时候,额外读取两个变量,分别为节点个数n_nodes ,以及message传递多少圈n_times。相关部分如下所示,这里为了表述清晰,我们把main函数切分为二,请不要责怪我“丢失了后半个}”。

这段代码其实很清晰,我只是用collect这个函数,从命令行里面读取了一些string类型的数据,然后用parse这个函数将string转化为usize类型,而已。那么我们怎么使用获得的n_nodes和n_times呢,乘一下就可以了…这就是我们message总共需要传递的次数。

// main
fn main() -> std::io::Result<()> {   
    let args: Vec<String> = env::args().collect();

    if args.len() < 3 {
        print_usage_and_exit();
    }

    let n_nodes = if let Ok(arg_num_nodes) = args[1].parse::<usize>() {
        if arg_num_nodes <= 1 {
            eprintln!("Number of nodes must be > 1");
            ::std::process::exit(1);
        }
        arg_num_nodes
    } else {
        print_usage_and_exit();
    };

    let n_times = if let Ok(arg_ntimes) = args[2].parse::<usize>() {
        arg_ntimes
    } else {
        print_usage_and_exit()
    };
    
    let limit = n_nodes * n_times;
           

现在迎来我们的重头戏,如何触发我们的“扳机”呢。实际上这部分短小精悍,一个闭包就搞定了。注意,我们即可以用Actor::start()开始Actor的运行,也可以用 Actor::create()开始Actor的运行。区别在于,start可以使得actor立刻运行,而creat通常适用于我们需要事先获取context目标的情况,也就是说两者的内部函数结构不同,这里外部使用creat,内部使用start。

这个闭包输入是一个叫ctx的变量,研究creat这个函数内部可以发现,这个ctx实际上是通过Context::new()而生成的Context<Self>类型,而这个context属于node变量。对应的ctx.address实际上node这个节点的地址(这里实际上是最后一个节点),这里将其绑定给了first_addr,然后又给了id为1的节点的next变量。

之后,id为1的节点通过start运行,获得自己的context。与此同时,prev_addr获得了第一个节点的地址。接着,我们运行第2、3…直到最后一个之前的节点。最后一个最为不同,它需要被闭包返回。实际上在一般的start中,我们只需要执行Node结构构建和run就好了,但是在creat里面,我们额外实现了点与点之间的串联,所以才搞得比较晦涩。

let system = System::new("ring");

    let node = Node::create(move |ctx| {
        let first_addr = ctx.address();
        let mut prev_addr = Node {
            id: 1,
            limit,
            next: first_addr.recipient(),
        }
        .start();

        for id in 2..n_nodes {
            prev_addr = Node {
                id,
                limit,
                next: prev_addr.recipient(),
            }
            .start();
        }

        Node {
            id: n_nodes,
            limit,
            next: prev_addr.recipient(),
        }
    });

           

为什么我们说Node返回的是最后一个节点呢?让我们运行程序,就可以看到如下输出(相关的输出我在代码里删掉了,因为它不影响正常逻辑),可以看到,id1的actor是最先激活的,而最后的id3的actor是最后激活的,这也印证了我们上面的逻辑。

C:\Users\xxx\RustProjects\xxx>cargo run 3 2
   Compiling xxx v0.1.0 (C:\Users\xxx\RustProjects\xxx)
    Finished dev [unoptimized + debuginfo] target(s) in 5.94s
     Running `target\debug\xxx.exe 3 2`
Setting up 3 nodes
Sending start message and waiting for termination after 6 messages...
Actor 1 is alive
Actor 2 is alive
Actor 3 is alive
           

下面这部分就好理解了,仅仅是在对node发送第一个send,并捎带手在执行下一次发送的时候,实现了计时功能而已。也就是说,实际上是最后一个id的节点执行了发送。对了,注意system.run()实际上被用来判断,上面的Actor循环有没有运行完。

println!(
        "Sending start message and waiting for termination after {} messages...",
        limit
    );

    let now = SystemTime::now();

    let _req = node.send(Payload(1));

    match system.run() {
        Ok(_) => println!("Completed"),
        Err(e) => println!("An error occured: {:?}", e),
    }

    match now.elapsed() {
        Ok(elapsed) => println!(
            "Time taken: {}.{:06} seconds",
            elapsed.as_secs(),
            elapsed.subsec_micros(),
        ),
        Err(e) => println!("An error occured: {:?}", e),
    }

    Ok(())
}
           

最后,补上main函数里面用到的一个报错说明,因为这个程序是要在运行的时候输入两个变量的。

// Info
fn print_usage_and_exit() -> ! {
    eprintln!("Usage; actix-test <num-nodes> <num-times-message-around-ring>");
    ::std::process::exit(1);
}
           
小结

好的,事已至此,你只需要将以上的代码块合并起来,就可以得到完整的main.rs文件了。最后,我们给出全部的输出结果。可以看到,激活过程中,顺序是1、2、3,而发送message的过程中,顺序是3、2、1,因为每一个节点的next存的,都是前一个节点,并且我们从最后一个node开始发送。

C:\Users\xxx\RustProjects\xxx>cargo run 3 2
   Compiling xxx v0.1.0 (C:\Users\xxx\RustProjects\xxx)
    Finished dev [unoptimized + debuginfo] target(s) in 5.94s
     Running `target\debug\xxx.exe 3 2`
Setting up 3 nodes
Sending start message and waiting for termination after 6 messages...
Actor 1 is alive
Actor 2 is alive
Actor 3 is alive
Actor 3 received message 1 of 6 (16.67%)
Actor 2 received message 2 of 6 (33.33%)
Actor 1 received message 3 of 6 (50.00%)
Actor 3 received message 4 of 6 (66.67%)
Actor 2 received message 5 of 6 (83.33%)
Actor 1 reached limit of 6 (payload was 6)
Completed
Time taken: 0.010000 seconds
           
参考文献

Axtic Quickstart

actix/examples/ring代码