天天看點

用Rust實作一個多線程的web server

在本文之前,我們用Rust實作一個單線程的web server的例子,但是單線程的web server不夠高效,是以本篇文章就來實作一個多線程的例子。

單線程web server存在的問題

請求隻能串行處理,也就是說當第一個連結處理完之前不會處理第二個連結。考慮如下例子:

use std::net::{TcpListener, TcpStream};
use std::io::{Read, Write};
use std::fs;
use std::{thread, time};

fn handle_client(mut stream: TcpStream) {
    let mut buffer = [0; 512];
    stream.read(&mut buffer).unwrap();
    let get = b"GET / HTTP/1.1\r\n";
    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "main.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();
    let response = format!("{}{}", status_line, contents);

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
    
    let ten_millis = time::Duration::from_millis(10000); 
    thread::sleep(ten_millis);              //睡眠一段時間,模拟處理時間很長
}

fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080")? ;

    for stream in listener.incoming() {
        handle_client(stream?);
    }
    Ok(())
}      

在浏覽器中打開兩個視窗,分别輸入127.0.0.1:8080,會發現在第一個處理完之前,第二個不會響應。

使用多線程來解決問題

  • 解決方式

修改main函數代碼:

fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080")?;
    let mut thread_vec: Vec<thread::JoinHandle<()>> = Vec::new();

    for stream in listener.incoming() {
        // handle_client(stream?);
        let stream = stream.unwrap();
        let handle = thread::spawn(|| {
            handle_client(stream);
        });
        thread_vec.push(handle);
    }

    for handle in thread_vec {
        handle.join().unwrap();
    }
    
    Ok(())
}      

從浏覽器打開兩個标簽,進行測試,可以發現第一個沒有處理完之前,第二個請求已經開始處理。

  • 存在問題

當存在海量請求時,系統也會跟着建立海量的線程,最終造成系統崩潰。

使用線程池來解決問題

  • 線程池
用Rust實作一個多線程的web server
  • 知識點

多線程、管道。

從主線程将任務發送到管道,工作線程等待在管道的接收端,當收到任務時,進行處理。

線程池方式實作

1、初步設計

  • 定義ThreadPool結構
use std::thread;
pub struct ThreadPool {
        thread: Vec<thread::JoinHandle<()>>,
}      
  • 定義ThreadPool的方法
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        //--snip--
    }
    
    pub fn execute()
    //pub fn execute<F>(&self, f: F)
    //    where
    //        F: FnOnce() + Send + 'static
    {
        //--snip--
    }
}      
  • 下面我們考慮new函數,可能的實作是這樣
pub fn new(size: usize) -> ThreadPool {
  assert!(size > 0);
  let mut threads = Vec::with_capacity(size);
  for _ in 0..size {
    //建立線程:
    //問題來了,建立線程的時候需要傳入閉包,也就是具體做的動作,
    //可是這個時候我們還沒有具體的任務,怎麼辦?
  }
    
  ThreadPool {
    threads
  }
}      
  • execute函數
//設計execute的函數,可以參考thread::spawn
pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static
{

}      

初步設計的問題總結:

主要是在建立線程池的new函數中,需要傳入具體的任務,可是此時還沒有具體的任務,如何解決?

2、解決線程建立的問題

  • 重新定義ThreadPool結構體
pub struct ThreadPool {
    workers: Vec<Worker>,
}      
  • ThreadPool的new方法
pub fn new(size: usize) -> ThreadPool {
    assert!(size > 0);

    let mut workers = Vec::with_capacity(size);

    for id in 0..size {
        workers.push(Worker::new(id));
    }

    ThreadPool {
        workers
    }
}      
  • 在worker中建立線程
struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker {
            id,
            thread,
        }
    }
}      

3、發送任務

  • 進一步将ThreadPool結構設計為
use std::sync::mpsc;

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;      
  • 完善new方法
impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();//add
        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            //workers.push(Worker::new(id));
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool {
            workers,
            sender,//add
        }
    }
    // --snip--
}

//--snip--

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker {
            id,
            thread,
        }
    }
}      

此段代碼錯誤,因為receiver要線上程間傳遞,但是是非線程安全的。是以應該使用Arc<Mutex<T>>。重新撰寫new方法如下:

use std::sync::Arc;
use std::sync::Mutex;
// --snip--

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));//add

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender,
        }
    }

    // --snip--
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {} got a job; executing.", id);

                job();
            }
        });

        Worker {
            id,
            thread,
        }
    }
}      
  • 實作execute方法
type Job = Box<dyn FnOnce() + Send + 'static>;//修改Job為trait對象的類别名稱

impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}      

完整代碼

src/main.rs

use std::fs;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::{thread, time};
use mylib::ThreadPool;

fn handle_client(mut stream: TcpStream) {
    let mut buffer = [0; 512];
    stream.read(&mut buffer).unwrap();
    let get = b"GET / HTTP/1.1\r\n";
    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "main.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();
    let response = format!("{}{}", status_line, contents);

    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();

    let ten_millis = time::Duration::from_millis(10000);
    thread::sleep(ten_millis);
}

fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080")?;
    // let mut thread_vec: Vec<thread::JoinHandle<()>> = Vec::new();

    let pool = ThreadPool::new(4);


    for stream in listener.incoming() {
        // // handle_client(stream?);
        let stream = stream.unwrap();
        // let handle = thread::spawn(|| {
        //     handle_client(stream);
        // });
        // thread_vec.push(handle);

        pool.execute(|| {
            handle_client(stream);
        });
    }

    // for handle in thread_vec {
    //     handle.join().unwrap();
    // }
    
    Ok(())
}      

src/mylib/lib.rs

use std::thread;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    // fn new(id: usize) -> Worker {
    //     let thread = thread::spawn(|| {});

    //     Worker {
    //         id,
    //         thread,
    //     }
    // }

    // fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
    //     let thread = thread::spawn(|| {
    //         receiver;
    //     });

    //     Worker {
    //         id,
    //         thread,
    //     }
    // }

    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {} got a job; executing.", id);

                job();
            }
        });

        Worker {
            id,
            thread,
        }
    }
}

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

// struct Job;
type Job = Box<dyn FnOnce() + Send + 'static>;//修改Job為trait對象的類别名稱

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
        // let mut threads = Vec::with_capacity(size);
        // for _ in 0..size {
        //     //建立線程:
        //     //問題來了,建立線程的時候需要傳入閉包,也就是具體做的動作,
        //     //可是這個時候我們還沒有具體的任務,怎麼辦?
        // }
            
        // ThreadPool {
        //     threads
        // }

        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));
        let mut workers = Vec::with_capacity(size);
    
        for id in 0..size {
            //workers.push(Worker::new(id));
            //workers.push(Worker::new(id, receiver));
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }
    
        ThreadPool {
            workers,
            sender,
        }
    }
    
    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}      
[dependencies]
mylib = {path = "./mylib"}      

目前版本存在的問題