什么是Ring Benchmark?
如果在网上搜索"Ring Benchmark",大多数结果显示,这个短语出自《Programming Erlang: Software for a Concurrent World》这本书8.11.2这一小节的某个练习题。虽然我没有找到这本书,但是并不影响该我们大概理解它的含义:创建N个节点,首尾相连构成一个圈,然后在这个圈子里某个节点开始发送消息,使得消息在这个圈里面循环M圈,看看时间是多少。对应的英文描述为:
// Write a ring benchmark. Create N processes in a ring. Send a
// message round the ring M times so that a total of N * M messages
// get sent. Time how long this takes for different values of N and M.
这个例子原始应该是使用Erlang这个语言来实现的,想说明的意思也很清晰:我这个Erlang语言写这种模式的问题真的是又快又好。但是,这个例子同样可以使用rust来实现,其实现来自于官方给出的example,但是并没有相关解析,这篇日志我们就来看一下其实现过程。
Artix框架
Artix是一款基于rust开发的Actor系统框架,看上去是比较成熟的Actor系统之一,并且根据目前的英文wiki来看,这款系统还是更新时间比较近的一款。因此,这里的Ring Benchmark实现也是利用的artix框架。关于这个库具体的介绍可以在crates.io中搜索Artix主页。
依赖库说明
针对toml文件,这里非常重要,因为Artix大量依赖一个叫Future的库,而这个Future库在19年上半年发生了大版本的更新,从v0.1直接更新到v0.3,是的,查看历史版本发现,v0.2全都被舍弃了。这就导致一些原本稳定的写法变得不再适用,我草。尽管如此,写成如下这样是没问题的,我们用Future中v0.1的最后一个版本。
[dependencies]
actix = "0.8"
futures = "0.1.29"
对应的,我们需要在main.rs中列写如下的头文件。actix就是上面说的框架,env用于实现运行程序时输入环境变量的读取,SystemTime是为了计时。
use actix::*;
use std::env;
use std::time::SystemTime;
定义传递的信息类型
这里定义传递的信息类型是一个叫Payload的结构体,其中包含了一个usize类型用于记录被传递的次数,非常清晰。实际上,rust中任何类型都可以被用来当做message,我们需要做的就是为你定义的类型额外实现Message这个Trait即可。所以,这里为Payload写一个实现,其中只有一句话:使用type关键字构建类型别名,即令Result为一个空量。这句话说明收到Payload消息的Actor需要返回一个Result类型的值,这里对应返回一个空值。
struct Payload(usize);
impl Message for Payload {
type Result = ();
}
定义合格的“演员”
所谓的Actor,在这个Actix框架中需要分别实现1)结构的定义、2)Trait的实现以及3)Handler的实现。
首先,这里定义结构体Node作为Actor主体,其中包含当前Actor的ID、消息总共需要传递的上限,以及下一个Actor的地址。这里需要注意的是,它使用了一个叫做Recipient的结构体,实际上Recipient是一种特殊形式的地址,比如这里它允许向这个地址对应的Actor传递仅此Payload一种形式的Message。
接下来,针对Node实现Actor的Trait属于固定句式,其中定义了Context<Self>这样的变量别名Context。这个Context是很重要的,它对应的变量(通常为ctx)可以实现Actor各种参数设置,例如Mailbox队列长度修改、自身地址查询、Actor自我停止等。另外,如果采用同步形式的循环,可以修改为SyncContext。
struct Node {
id: usize,
limit: usize,
next: Recipient<Payload>,
}
impl Actor for Node {
type Context = Context<Self>;
}
最后,Handler实际上是Actor接收到Message之后执行的内容,这里为Node变量(也就是我们的Actor主体)实现接收到Payload这个Message之后需要进行的Handler。关于result的别名定义同Message中定义的保持相同。而handle函数的变量声明signature基本是固定的:self的可变引用、信息类型的msg、Context<Self>的可变引用(这里不涉及参数修改之类的使用,以通用符替代)。
在具体功能方面,无论哪个圆环上哪个节点接收到了信息,都要确认一下是否需要停止(消息转够了次数),这就是为什么我们在每个Actor里面都配备了limit上限这个变量。如果达到了,我们把整个运行系统关闭,就可以停止了。另一个要做的事情,是向下一个自己地址库的下一个地址再次发送这个Payload,并对里面的计数器加一。这里我们使用地址类型里面do_send这个函数,实际上是比较激进的,因为这个send是直接bypass接收人mailbox的存在。
impl Handler<Payload> for Node {
type Result = ();
fn handle(&mut self, msg: Payload, _: &mut Context<Self>) {
if msg.0 >= self.limit {
println!(
"Actor {} reached limit of {} (payload was {})",
self.id, self.limit, msg.0
);
System::current().stop();
return;
}
self.next
.do_send(Payload(msg.0 + 1))
.expect("Unable to send payload");
}
}
如何扣动“扳机”
你看,基本的Actor配置就是上面那样,实际上我们制定了规则,而规则对于每一个Actor都是一视同仁的。这将是多么美好的一个世界。现在只需要伸出手,扣动扳机即可(运行main函数)。
作为main函数,有一些必要的事情需要我们完成,比如在命令行运行程序的时候,额外读取两个变量,分别为节点个数n_nodes ,以及message传递多少圈n_times。相关部分如下所示,这里为了表述清晰,我们把main函数切分为二,请不要责怪我“丢失了后半个}”。
这段代码其实很清晰,我只是用collect这个函数,从命令行里面读取了一些string类型的数据,然后用parse这个函数将string转化为usize类型,而已。那么我们怎么使用获得的n_nodes和n_times呢,乘一下就可以了…这就是我们message总共需要传递的次数。
// main
fn main() -> std::io::Result<()> {
let args: Vec<String> = env::args().collect();
if args.len() < 3 {
print_usage_and_exit();
}
let n_nodes = if let Ok(arg_num_nodes) = args[1].parse::<usize>() {
if arg_num_nodes <= 1 {
eprintln!("Number of nodes must be > 1");
::std::process::exit(1);
}
arg_num_nodes
} else {
print_usage_and_exit();
};
let n_times = if let Ok(arg_ntimes) = args[2].parse::<usize>() {
arg_ntimes
} else {
print_usage_and_exit()
};
let limit = n_nodes * n_times;
现在迎来我们的重头戏,如何触发我们的“扳机”呢。实际上这部分短小精悍,一个闭包就搞定了。注意,我们即可以用Actor::start()开始Actor的运行,也可以用 Actor::create()开始Actor的运行。区别在于,start可以使得actor立刻运行,而creat通常适用于我们需要事先获取context目标的情况,也就是说两者的内部函数结构不同,这里外部使用creat,内部使用start。
这个闭包输入是一个叫ctx的变量,研究creat这个函数内部可以发现,这个ctx实际上是通过Context::new()而生成的Context<Self>类型,而这个context属于node变量。对应的ctx.address实际上node这个节点的地址(这里实际上是最后一个节点),这里将其绑定给了first_addr,然后又给了id为1的节点的next变量。
之后,id为1的节点通过start运行,获得自己的context。与此同时,prev_addr获得了第一个节点的地址。接着,我们运行第2、3…直到最后一个之前的节点。最后一个最为不同,它需要被闭包返回。实际上在一般的start中,我们只需要执行Node结构构建和run就好了,但是在creat里面,我们额外实现了点与点之间的串联,所以才搞得比较晦涩。
let system = System::new("ring");
let node = Node::create(move |ctx| {
let first_addr = ctx.address();
let mut prev_addr = Node {
id: 1,
limit,
next: first_addr.recipient(),
}
.start();
for id in 2..n_nodes {
prev_addr = Node {
id,
limit,
next: prev_addr.recipient(),
}
.start();
}
Node {
id: n_nodes,
limit,
next: prev_addr.recipient(),
}
});
为什么我们说Node返回的是最后一个节点呢?让我们运行程序,就可以看到如下输出(相关的输出我在代码里删掉了,因为它不影响正常逻辑),可以看到,id1的actor是最先激活的,而最后的id3的actor是最后激活的,这也印证了我们上面的逻辑。
C:\Users\xxx\RustProjects\xxx>cargo run 3 2
Compiling xxx v0.1.0 (C:\Users\xxx\RustProjects\xxx)
Finished dev [unoptimized + debuginfo] target(s) in 5.94s
Running `target\debug\xxx.exe 3 2`
Setting up 3 nodes
Sending start message and waiting for termination after 6 messages...
Actor 1 is alive
Actor 2 is alive
Actor 3 is alive
下面这部分就好理解了,仅仅是在对node发送第一个send,并捎带手在执行下一次发送的时候,实现了计时功能而已。也就是说,实际上是最后一个id的节点执行了发送。对了,注意system.run()实际上被用来判断,上面的Actor循环有没有运行完。
println!(
"Sending start message and waiting for termination after {} messages...",
limit
);
let now = SystemTime::now();
let _req = node.send(Payload(1));
match system.run() {
Ok(_) => println!("Completed"),
Err(e) => println!("An error occured: {:?}", e),
}
match now.elapsed() {
Ok(elapsed) => println!(
"Time taken: {}.{:06} seconds",
elapsed.as_secs(),
elapsed.subsec_micros(),
),
Err(e) => println!("An error occured: {:?}", e),
}
Ok(())
}
最后,补上main函数里面用到的一个报错说明,因为这个程序是要在运行的时候输入两个变量的。
// Info
fn print_usage_and_exit() -> ! {
eprintln!("Usage; actix-test <num-nodes> <num-times-message-around-ring>");
::std::process::exit(1);
}
小结
好的,事已至此,你只需要将以上的代码块合并起来,就可以得到完整的main.rs文件了。最后,我们给出全部的输出结果。可以看到,激活过程中,顺序是1、2、3,而发送message的过程中,顺序是3、2、1,因为每一个节点的next存的,都是前一个节点,并且我们从最后一个node开始发送。
C:\Users\xxx\RustProjects\xxx>cargo run 3 2
Compiling xxx v0.1.0 (C:\Users\xxx\RustProjects\xxx)
Finished dev [unoptimized + debuginfo] target(s) in 5.94s
Running `target\debug\xxx.exe 3 2`
Setting up 3 nodes
Sending start message and waiting for termination after 6 messages...
Actor 1 is alive
Actor 2 is alive
Actor 3 is alive
Actor 3 received message 1 of 6 (16.67%)
Actor 2 received message 2 of 6 (33.33%)
Actor 1 received message 3 of 6 (50.00%)
Actor 3 received message 4 of 6 (66.67%)
Actor 2 received message 5 of 6 (83.33%)
Actor 1 reached limit of 6 (payload was 6)
Completed
Time taken: 0.010000 seconds
参考文献
Axtic Quickstart
actix/examples/ring代码