天天看點

Rust網絡程式設計架構-Tokio進階

我們在上文《小朋友也能聽懂的Rust網絡程式設計架構知識-Tokio基礎篇》對于Tokio的基礎知識進行了一下初步的介紹,本文就對于Tokio的用法及原理進行進一步的介紹與說明。

目前市面上絕大多數程式設計語言所編寫的程式,執行程式與代碼編寫順序完全相同,當然有的讀者可能會提到CPU的亂序執行機制,但亂序執行從本質上講還是順序送出的,程式在第一行執行完成之後再去執行下一行,并以此類推,是通用的程式設計模式。

在這種傳統的式程式設計範式中,當程式遇到耗時操作時,會一直阻塞直到操作完成。比如建立TCP連接配接可能需要與網絡上的對端節點進行若幹次握手,這可能會花費相當多的時間。在此期間,線程被阻塞而無法完成其它操作。

在傳統的程式設計範式中往往使用回調機制來進行資源調配的優化,對于不能立即完成的操作将被挂起到背景,這種情況下線程不會被阻塞,可以繼續執行其它任務。一旦操作完成,該任務的回調函數将被調用,進而使任務最終完成。盡管回調模式可以帶來使應用程式的效率更高,但也會導緻程式更複雜。開發者需要跟蹤異步操作完成後恢複工作所需的所有狀态,從我的經驗來看,這是一項特别乏味而且極容易出錯的工作任務。

為什麼需要異步調用

以下例程部分依賴于mini-redis子產品在執行了cargo install mini-redis之後,并在Cargo.toml最後加入以下配置項之後,

tokio = { version = "1", features = ["full"] }
mini-redis = "0.4"      
Rust網絡程式設計架構-Tokio進階

即可順利執行下列代碼:

use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};
#[tokio::main]
async fn main() {
// 綁定端口
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
    loop {
        // 監控端口消息,對于每個socket請求,都啟動一個folk程序,進行處理
        let (socket, _) = listener.accept().await.unwrap();
        Process(socket).await;
    }
}
async fn Process(socket: TcpStream) {
    let mut connection = Connection::new(socket);
    if let Some(frame) = connection.read_frame().await.unwrap() {
        println!("WE GOT: {:?}", frame);
        let response = Frame::Error("not finished".to_string());
        connection.write_frame(&response).await.unwrap();
    }
}      
Rust網絡程式設計架構-Tokio進階

以上代碼可能我們在其它語言程式設計中所經常遇到的,對于每個Socket連接配接都通過一個線程來處理(當然這裡隻是以Rust為例說明,在Tokio中不推薦這種做法,我也就沒有另行啟動線程)并且最關鍵的一點是process(socket).await;是同步調用,也就是說線上程阻塞在process函數時并沒有其它事情可做,整個線程必須要等到響應被完全寫入socket stream才能傳回。而這種并發處理與我們盡可能多的同時處理更多請求的初衷是不一緻的。

這裡筆者必須要指出,并發和并行完全是兩件事。多個任務交替執行是并發,并行是有多個人,一個人負責一個任務。而Rust的Tokio最大就是并發效率很高,線程并不需要去等待那些無效的任務,衆多并發任務之間由Tokio去統一排程。

Tokio的答案

Rust使用spawn關鍵字來建立此類并發任務的任務池,按照筆者的了解,這和線程池不是一個概念,因為并發的任務可能有多個線程共同處理,也可能隻有一個線程就搞定了。在使用Rust這種并發任務的異步函數使用async關鍵字修飾,在異步函數的函數體内任何類似于await的阻塞調用用都會使任務将控制權交還給線程。當操作程序在背景時,線程可以做其他工作。操作産生的結果也将形成一個Future,也就是未來才會産生的值被系統以變通的方式優化處理,改寫後的代碼如下:

use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};
#[tokio::main]
async fn main() {
// 綁定端口
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
    loop {
        // 監控端口消息,對于每個socket請求,都啟動一個folk程序,進行處理
        let (socket, _) = listener.accept().await.unwrap();
        tokio::spawn(async move {
            process(socket).await;
        });
    }
}
async fn Process(socket: TcpStream) {
    let mut connection = Connection::new(socket);
    if let Some(frame) = connection.read_frame().await.unwrap() {
        println!("WE GOT: {:?}", frame);
        let response = Frame::Error("not finished".to_string());
        connection.write_frame(&response).await.unwrap();
    }
}      
Rust網絡程式設計架構-Tokio進階

Tokio的任務通過tokio::spawn來建立,spawn函數傳回一個JoinHandle,調用者可以使用JoinHandle它與Tokio的任務進行互動。async修飾的函數的傳回值以Future方式傳回。調用者可以使用.awai來Future的執行結果。代碼如下:

#[tokio::main]async fn main() {
    let handle = tokio::spawn(async {
          "hello beyondma"
    });
     let out = handle.await.unwrap();
    println!("GOT {}", out);
}      
Rust網絡程式設計架構-Tokio進階

上述程式運作結果為

GOT hello beyondma

當Tokio任務執行過程中遇到錯誤時,JoinHandle将傳回一個Err。當任務失敗時,或者當任務被強制關閉時,是鐵定會傳回ERR的。Tokio任務由Tokio排程器管理的最小可執行單元。正如上文所說Tokio的任務可能在同一個線程上執行,也可能在不同的線程上執行,這種多路複用機制可以參考上文《《小朋友也能聽懂的Rust網絡程式設計架構知識-Tokio基礎篇》》

Tokio任務之間的同步與通信

我們知道Rust有着比較獨特的變量生命周期機制,在之前的示例代碼當中都是用了move關鍵字來強制傳遞變量所屬關系的,如下:

tokio::spawn(async move {

            process(socket).await;

        });

那麼如何在各個Tokio任務之間進行通信與狀态同步也是個值得在本文中讨論的問題。

這裡我們先來讨論比較簡單的情況,可以用Arc<Mutex<_>>類型,也就是加互斥鎖的哈希表來進行任務間的資訊傳遞與同步,使用clone方法來為每個任務擷取自己的哈希表執行個體。具體如下:

use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
    println!("Listening");
    let hashMap= Arc::new(Mutex::new(HashMap::new()));
    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // Clone the handle to the hash map.
        let thisMap = hashMap.clone();
        println!("Accepted");
        tokio::spawn(async move {
             let mut thisMap = thisMap .lock().unwrap();
             thisMap .insert("hello", "beyondma");
             println!("{:?}",thisMap );
           
        });
    }
}      
Rust網絡程式設計架構-Tokio進階

這樣随便找其它終端或者在本機上執行telnet 伺服器IP 6379

就可以看到以下結果

Listening
Accepted
{"hello": "beyondma"}      
Rust網絡程式設計架構-Tokio進階

這裡這個hashMap的确可以在程序之間進行資訊的共享與同步,但是在這種高并發的架構中一般還是推薦使用管道(channel)來進行相關操作。有關channel的話題我們會在下次再深入講解。

Tokio的任務非常輕,隻需要一個64位元組的上下文即可,考慮到Rust中也沒有GC機制,是以基于Tokio理論上完全可以做出比Golang支援更多并發的應用程式,這也是筆者會計劃用3篇左右的系列文章來對于Tokio進行詳細介紹的原因。