天天看点

001 Rust 异步编程,Why Async

前言

我们之前已经学习过Rust编程基础相关的一些知识,下面进入到Rust异步编程的学习,本节主要介绍为什么需要Rust异步编程。

场景说明

假定有一个客户端,需要从2个不同的网站进行下载任务,我们用Rust程序模拟下载过程,并说明为什么需要Rust异步编程。

服务端的实现

Server1

use std::net::{TcpListener, TcpStream};
use std::io::{Read, Write};
use std::thread;
use std::time;

fn handle_client(mut stream: TcpStream, wait_time: u64) -> std::io::Result<()> {
    let mut buf = [0; 512];
    loop {
        let bytes_read = stream.read(&mut buf)?;
        if bytes_read == 0 {
            return Ok(());
        }

        thread::sleep(time::Duration::from_secs(wait_time));
        stream.write(&buf[..bytes_read])?;
        stream.write(&("\n".as_bytes()))?;
    }    
}

fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080")?;

    for stream in listener.incoming() {
        handle_client(stream?, 20)?;
    }

    Ok(())
}      

server2

use std::net::{TcpListener, TcpStream};
use std::io::{Read, Write};
use std::thread;
use std::time;

fn handle_client(mut stream: TcpStream, wait_time: u64) -> std::io::Result<()> {
    let mut buf = [0; 512];
    loop {
        let bytes_read = stream.read(&mut buf)?;
        if bytes_read == 0 {
            return Ok(());
        }

        thread::sleep(time::Duration::from_secs(wait_time));
        stream.write(&buf[..bytes_read])?;
        stream.write(&("\n".as_bytes()))?;
    }    
}

fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8081")?;

    for stream in listener.incoming() {
        handle_client(stream?, 10)?;
    }

    Ok(())
}      

client

迭代一

use std::net::TcpStream;
use std::io::{ prelude::*, BufReader, Write };
use std::str;

fn use_server(server: &str, port: u16, content: &str) -> std::io::Result<()> {
    let mut stream = TcpStream::connect((server, port))?;
    let _ = stream.write(content.as_bytes())?;

    let mut reader = BufReader::new(&stream);
    let mut buffer: Vec<u8> = Vec::new();
    reader.read_until(b'\n', &mut buffer)?;

    println!("recv from server: {} ", str::from_utf8(&buffer).unwrap());
    Ok(())
}

//迭代一
fn main() -> std::io::Result<()> {
    use_server("127.0.0.1", 8080, "use server 127.0.0.1:8080")?;
    use_server("127.0.0.1", 8081, "use server 127.0.0.1:8081")?;

    Ok(())
}      

程序存在的问题:

必须从第一个网站下载完成后才能从第二个网站进行下载,不符合实际下载情况。      

迭代二

针对迭代一中的问题,解决办法如下:

使用多线程实现,每个下载任务都起一个线程。      

代码如下:

use std::net::TcpStream;
use std::io::{ prelude::*, BufReader, Write };
use std::str;
use std::thread;

fn use_server(server: &str, port: u16, content: &str) -> std::io::Result<()> {
    let mut stream = TcpStream::connect((server, port))?;
    let _ = stream.write(content.as_bytes())?;

    let mut reader = BufReader::new(&stream);
    let mut buffer: Vec<u8> = Vec::new();
    reader.read_until(b'\n', &mut buffer)?;

    println!("recv from server: {} ", str::from_utf8(&buffer).unwrap());
    Ok(())
}

迭代二
fn main() -> std::io::Result<()> {
    let mut handles: Vec<thread::JoinHandle<()>> = Vec::new(); 
    let handle = thread::spawn(move || {
        use_server("127.0.0.1", 8080, "use server 127.0.0.1:8080").unwrap();
    });
    handles.push(handle);
    
    let handle = thread::spawn(move || {
        use_server("127.0.0.1", 8081, "use server 127.0.0.1:8081").unwrap();
    });
    handles.push(handle);

    for handle in handles {
        handle.join().unwrap();
    }
    Ok(())
 }      

程序存在问题:

1、如果有几千几万个任务,每个任务都起一个线程,怎么办?
2、如果使用线程池解决,但是我们每个下载任务其实都是发起请求后在等待server的响应,效率还是不够高。那么有没有什么办法解决这些问题?      

迭代三

针对迭代二中的问题,可以考虑用异步编程来解决,即不需要起多个线程,每个任务就绪后再执行,修改过程如下:

  • 配置文件Cargo.toml
[dependencies]
  futures = "0.3.4"      
  • 源码:
use std::net::TcpStream;
use std::io::{ prelude::*, BufReader, Write };
use std::str;
use futures::join;
use futures::executor;
fn use_server(server: &str, port: u16, content: &str) -> std::io::Result<()> {
    let mut stream = TcpStream::connect((server, port))?;
    let _ = stream.write(content.as_bytes())?;

    let mut reader = BufReader::new(&stream);
    let mut buffer: Vec<u8> = Vec::new();
    reader.read_until(b'\n', &mut buffer)?;

    println!("recv from server: {} ", str::from_utf8(&buffer).unwrap());
    Ok(())
}
//迭代三
async fn async_use_server(server: &str, port: u16, content: &str) {
    use_server(server, port, content).unwrap();
}
async fn use_all_server() {
    let f1 = async_use_server("127.0.0.1", 8080, "use server 127.0.0.1:8080");
    let f2 = async_use_server("127.0.0.1", 8081, "use server 127.0.0.1:8081");
    join!(f1, f2);
}
fn main() -> std::io::Result<()> {
    let f = use_all_server();
    executor::block_on(f);
    Ok(())
}      

通过Rust异步编程,我们期望能够达到不使用多线程达到迭代二中多线程一样的效果。