天天看點

rust學習(二)mio和socketpair及多線程

前言

上章學習了rust 環境搭建及IDE配置

這章主要講述 模拟C++的socketpair,順便用下多線程

rust 中文教程 位址:https://kaisery.github.io/trpl-zh-cn/

english :https://doc.rust-lang.org/book/

第三方庫:https://docs.rs/

标準庫:https://www.rustwiki.org.cn/zh-CN/std/index.html

目前使用的rust 1.65版本

1:mio庫

目前 已經是0.8.5版本了,上次的demo用的0.6.* 版本 ,tokio 也用mio

Mio 提供可跨平台的 system selector 通路,是以 Linux epoll、Windows IOCP、FreeBSD/macOS ,甚至許多有潛力的平台都可調用相同的 API。 不同平台使用 Mio API 的開銷不盡相同。 由于 Mio 是提供基于 readiness(就緒狀态)的 API,與 Linux epoll 相似,不少 API 在 Linux 上都可以一對一映射。 (例如: 實質上是一個 陣列。 )對比之下,Windows IOCP 是基于完成(completion-based)而非基于 readiness 的 API,是以兩者間會需要較多橋接。 同時mio提供自身版本的TcpListener、TcpStream、UdpSocket,這些API封裝了底層平台相關API,并設為非阻塞且實作Evented trait。

rust學習(二)mio和socketpair及多線程

2: socketpair

雖然有channel

但是不可能以線程 一直等着接受消息

是以跟c++ 的一樣的,有消息存在,發送個通知給對方,這樣隻需要調用

Token(1) 為socketpair專用

rust學習(二)mio和socketpair及多線程

channel send 一個條消息,socket發送一個位元組,收到多少位元組就調用多少次receiver1.recv()

std::sync::mpsc::channel;//支援多Sender,僅支援1個Receiver,可保證接收消息的順序與發送的順序一緻。

let (sender1, receiver1) = channel::<Trans_Data>(); //channel::<>()

let (sender2, receiver2) = channel::<Trans_Data>();

rust學習(二)mio和socketpair及多線程

3:code`

rust學習(二)mio和socketpair及多線程
extern crate mio;
extern  crate rand;

use std::net;
use std::io ;


use mio::{Events, Poll, Interest, Token};
use mio::net::TcpStream ;

use std::sync::mpsc::channel;//支援多Sender,僅支援1個Receiver,可保證接收消息的順序與發送的順序一緻。
use std::{thread,time};
use std::io::{Write, Read, Error};
use std::time::{Duration};

use std::io::{ ErrorKind};


fn socket_pair()->std::io::Result<(std::net::TcpStream,std::net::TcpStream)>{
    let listener = net::TcpListener::bind("127.0.0.1:0")?;

    let clientaddr = listener.local_addr()? ;
    let stream1 = net::TcpStream::connect(&clientaddr)?;
    // accept connections and process them serially
    //  let  Ok((stream,_)) =  listener.accept()?;
    match listener.accept() {
        Ok((socket, _)) =>{ drop(listener); return Ok((stream1,socket)); },
        Err(e) => {drop(listener); return Err(e); }
    }
}

//#[derive(Clone)]
struct Trans_Data {
    userid: u32,
    msgindex:u32,
    context:String
}
//
// impl Clone for Trans_Data{
//     fn clone(&self) -> Self{
//         Self { userid: self.userid, msgindex: self.msgindex,context:self.context.clone() }
//     }
// }
//impl Copy for Trans_Data{}


fn main() {
    println!("Hello, world!");
    let x = rand::random::<u8>();
    println!("num={}",x);

    let mut poll1 = Poll::new().expect("Failed to create Poll_1");
    let mut poll2 = Poll::new().expect("Failed to create Poll_2");

    let(socket1,socket2) = match  socket_pair() {
        Ok((s1,s2))=> (s1,s2),
        _=>unreachable!()
    } ;

    let mut fd1=   register(& mut poll1, &  socket1,Token(1)).unwrap();
    let mut fd2=   register(& mut poll2,&   socket2,Token(1)).unwrap();

    // let (sender1, receiver1) = channel(); //channel::<>()
    //
    // let (sender2, receiver2) = channel();

    let (sender1, receiver1) = channel::<Trans_Data>(); //channel::<>()

    let (sender2, receiver2) = channel::<Trans_Data>();
    const WAITGEPMS: u32 = 1000;
    thread::spawn(move|| {
        let mut events = Events::with_capacity(4);
        sender2.send(Trans_Data{
            userid: 1,
            msgindex:1,
            context:String::from("context=1+1")
        }).unwrap() ;
//        sender2.send(2).unwrap() ;
//        let mut s = [0 as u8; 1];
//        s[0] = 1;
        //  fd1.write(&s[..]) ;
//        fd1.write(&[1]);
        fd1.write(&[1]);
        //     println!("thread1 send");
        //   const WAITGEPMS: u32 = 1000;
        let geptime = Some(Duration::from_millis(WAITGEPMS as u64));

        loop {
            match poll1.poll(&mut events, geptime){
                Ok(())=>{},  //println!("[1111111]OK")
                _=>{println!("[1111111] num= ERR");}
            }

            for event in &events {
                if event.token() == Token(1) {
                  //  println!("recv [11111] event.token()");

                    //event.is_readable()
                    if event.is_readable() {
                        let mut buf=[0u8;4096];
                        let bytes = match fd1.read(&mut buf) {
                            Ok(n) => {
                                if n > 1 {
                                    println!("[11111] read data len={}",n);
                                }
                                n
                            },
                            Err(e)=> {
                                if e.kind() == ErrorKind::WouldBlock {
                                    0
                                }else{
                                    println!("[1111111] read data fail={}",e); 0
                                }
                            }
                        };
                        thread::sleep(time::Duration::from_millis(1));
                        for i in  0 ..bytes {
                            let mut recvnum = receiver1.recv().unwrap();
                           // sender2.send(recvnum+1).unwrap() ;
                           let mut data1 =  recvnum as Trans_Data  ;

                            let tmpuser = data1.userid;
                            let mut tmpmsgindex = data1.msgindex ;

                            let msgcount =  data1.msgindex ;
                            data1.msgindex += 1 ;
                           // data1.context=String::from("context=");
                            let mut  tmpcontext = String::from("context=")+ &data1.userid.to_string() ;
                            tmpcontext += "+" ;
                            tmpcontext+= &data1.msgindex.to_string();
                            data1.context = tmpcontext;
                         //   data1.context = "context="+

                            sender2.send(data1).unwrap() ;

                            fd1.write(&[1]).unwrap();

                            let tmpnum = rand::random::<u8>();

                            for j in 0..(tmpnum&0xF) {
                                let mut  tmpcontext3 = String::from("context=")+ &tmpuser.to_string() ;
                                tmpcontext3 += "+" ;
                                tmpmsgindex += 1;
                                tmpcontext3 += &tmpmsgindex.to_string();

                                sender2.send(Trans_Data{
                                    userid: tmpuser,
                                    msgindex:tmpmsgindex,
                                    context:tmpcontext3
                                }).unwrap() ;

                            }
                         //   println!("[1111111] recv data={:?} send data={}", msgcount,tmpmsgindex);
                        }
                    }
                    reregister(& mut poll1,& mut fd1,Token(1));
                }
            }
//            let mut recvnum = receiver1.recv().unwrap();
//            println!("[1112222222] recv data={:?} send data={}", recvnum,recvnum+1);
        }
    });

    {
        //  const WAITGEPMS: u32 = 1000;
        let geptime = Some(Duration::from_millis(WAITGEPMS as u64));
        let mut events = Events::with_capacity(4);
        loop {

            match poll2.poll(&mut events, geptime){
               // Ok(num)=>{println!("[222222] num= {}",num);},
                Ok(())=>{}, //println!("[22222]OK")
                _=>{println!("[222222] num= ERR");}
            }

            for event in &events {
                if event.token() == Token(1) {
                  //  println!("recv [22222] event.token()");
                    if event.is_readable() {
                        let mut buf=[0u8;4096];
                        let bytes = match fd2.read(&mut buf) {
                            Ok(n) => {
                                if n > 1 {
                                    println!("[222222] read data len={}", n);
                                }
                                n
                                //  println!("[222222] read data len={}",n); n},
                            }
                            Err(e)=> {
                                if e.kind() == ErrorKind::WouldBlock {
                                    0
                                } else {
                                    println!("[222222] read data fail={}", e);
                                    0
                                }
                            }
                        };
                        //   println!("[222222] read data len={}",bytes);
                        thread::sleep(time::Duration::from_millis(1));
                        for i in  0 ..bytes {
                            let mut recvnum = receiver2.recv().unwrap();

                            let mut data1 =  recvnum as Trans_Data  ;

                            let tmpuser = data1.userid;
                            let mut tmpmsgindex = data1.msgindex ;

                            let msgcount =  data1.msgindex ;
                            data1.msgindex += 1 ;

                            let mut  tmpcontext = String::from("context=")+ &data1.userid.to_string() ;
                            tmpcontext += "+" ;
                            tmpcontext+= &data1.msgindex.to_string();
                            data1.context = tmpcontext;
                            sender1.send(data1).unwrap() ;
                            //     recvnum += 1 ;
                           // sender1.send(recvnum+1).unwrap() ;
                            fd2.write(&[1]).unwrap();

                            let tmpnum = rand::random::<u8>();

                            for j in 0..(tmpnum&0x1F) {
                                let mut  tmpcontext3 = String::from("context=")+ &tmpuser.to_string() ;
                                tmpcontext3 += "+" ;
                                tmpmsgindex += 1;
                                tmpcontext3 += &tmpmsgindex.to_string();

                                sender1.send(Trans_Data{
                                    userid: tmpuser,
                                    msgindex:tmpmsgindex,
                                    context:tmpcontext3
                                }).unwrap() ;

                            }
                          //  println!("[222222] recv data={:?} send data={}", msgcount,tmpmsgindex);
                        }
                    }
                    reregister(& mut poll2,& mut fd2,Token(1));
                }
            }
        }
    }


}

// #[cfg(target_os = "linux")]
// fn get_stream(sock:&  std::net::TcpStream)->mio::net::TcpStream{
//
// }

// #[cfg(target_os = "windows")]
// fn get_stream(sock:&  std::net::TcpStream)->mio::net::TcpStream{
//    // unsafe { mio::net::TcpStream::FromRawSocket::from_raw_socket(*sock) }
//     mio::net::TcpStream::connect("")
// }

pub fn register( poll: &mut Poll,sock:&  std::net::TcpStream,token:Token) -> io::Result<(mio::net::TcpStream)> { //& mut

    let mut  stream = mio::net::TcpStream::from_std((*sock).try_clone()?);
    // if cfg!(target_os = "linux") {
    //     println!("linux");
    // } else {
    //     println!("not linux");
    // }
    match poll.registry().register(
         &mut stream,
         token,
         Interest::READABLE | Interest::WRITABLE){
        Ok(_)=>Ok(stream),
        _=>unreachable!()
    }
}

pub fn reregister(poll: &mut Poll, stream:& mut mio::net::TcpStream,token:Token) -> io::Result<()> {
    //trace!("connection reregister; token={:?}", self.token);

    poll.registry().reregister(
        stream,
        token,
        Interest::READABLE | Interest::WRITABLE
    ).or_else(|e| {
            //error!("Failed to reregister {:?}, {:?}", self.token, e);
            Err(e)
        })
}

           

繼續閱讀