前言
上章學習了rust 環境搭建及IDE配置
這章主要講述 模拟C++的socketpair,順便用下多線程
rust 中文教程 位址:https://kaisery.github.io/trpl-zh-cn/
english :https://doc.rust-lang.org/book/
第三方庫:https://docs.rs/
标準庫:https://www.rustwiki.org.cn/zh-CN/std/index.html
目前使用的rust 1.65版本
1:mio庫
目前 已經是0.8.5版本了,上次的demo用的0.6.* 版本 ,tokio 也用mio
Mio 提供可跨平台的 system selector 通路,是以 Linux epoll、Windows IOCP、FreeBSD/macOS ,甚至許多有潛力的平台都可調用相同的 API。 不同平台使用 Mio API 的開銷不盡相同。 由于 Mio 是提供基于 readiness(就緒狀态)的 API,與 Linux epoll 相似,不少 API 在 Linux 上都可以一對一映射。 (例如: 實質上是一個 陣列。 )對比之下,Windows IOCP 是基于完成(completion-based)而非基于 readiness 的 API,是以兩者間會需要較多橋接。 同時mio提供自身版本的TcpListener、TcpStream、UdpSocket,這些API封裝了底層平台相關API,并設為非阻塞且實作Evented trait。
2: socketpair
雖然有channel
但是不可能以線程 一直等着接受消息
是以跟c++ 的一樣的,有消息存在,發送個通知給對方,這樣隻需要調用
Token(1) 為socketpair專用
channel send 一個條消息,socket發送一個位元組,收到多少位元組就調用多少次receiver1.recv()
std::sync::mpsc::channel;//支援多Sender,僅支援1個Receiver,可保證接收消息的順序與發送的順序一緻。
let (sender1, receiver1) = channel::<Trans_Data>(); //channel::<>()
let (sender2, receiver2) = channel::<Trans_Data>();
3:code`
extern crate mio;
extern crate rand;
use std::net;
use std::io ;
use mio::{Events, Poll, Interest, Token};
use mio::net::TcpStream ;
use std::sync::mpsc::channel;//支援多Sender,僅支援1個Receiver,可保證接收消息的順序與發送的順序一緻。
use std::{thread,time};
use std::io::{Write, Read, Error};
use std::time::{Duration};
use std::io::{ ErrorKind};
fn socket_pair()->std::io::Result<(std::net::TcpStream,std::net::TcpStream)>{
let listener = net::TcpListener::bind("127.0.0.1:0")?;
let clientaddr = listener.local_addr()? ;
let stream1 = net::TcpStream::connect(&clientaddr)?;
// accept connections and process them serially
// let Ok((stream,_)) = listener.accept()?;
match listener.accept() {
Ok((socket, _)) =>{ drop(listener); return Ok((stream1,socket)); },
Err(e) => {drop(listener); return Err(e); }
}
}
//#[derive(Clone)]
struct Trans_Data {
userid: u32,
msgindex:u32,
context:String
}
//
// impl Clone for Trans_Data{
// fn clone(&self) -> Self{
// Self { userid: self.userid, msgindex: self.msgindex,context:self.context.clone() }
// }
// }
//impl Copy for Trans_Data{}
fn main() {
println!("Hello, world!");
let x = rand::random::<u8>();
println!("num={}",x);
let mut poll1 = Poll::new().expect("Failed to create Poll_1");
let mut poll2 = Poll::new().expect("Failed to create Poll_2");
let(socket1,socket2) = match socket_pair() {
Ok((s1,s2))=> (s1,s2),
_=>unreachable!()
} ;
let mut fd1= register(& mut poll1, & socket1,Token(1)).unwrap();
let mut fd2= register(& mut poll2,& socket2,Token(1)).unwrap();
// let (sender1, receiver1) = channel(); //channel::<>()
//
// let (sender2, receiver2) = channel();
let (sender1, receiver1) = channel::<Trans_Data>(); //channel::<>()
let (sender2, receiver2) = channel::<Trans_Data>();
const WAITGEPMS: u32 = 1000;
thread::spawn(move|| {
let mut events = Events::with_capacity(4);
sender2.send(Trans_Data{
userid: 1,
msgindex:1,
context:String::from("context=1+1")
}).unwrap() ;
// sender2.send(2).unwrap() ;
// let mut s = [0 as u8; 1];
// s[0] = 1;
// fd1.write(&s[..]) ;
// fd1.write(&[1]);
fd1.write(&[1]);
// println!("thread1 send");
// const WAITGEPMS: u32 = 1000;
let geptime = Some(Duration::from_millis(WAITGEPMS as u64));
loop {
match poll1.poll(&mut events, geptime){
Ok(())=>{}, //println!("[1111111]OK")
_=>{println!("[1111111] num= ERR");}
}
for event in &events {
if event.token() == Token(1) {
// println!("recv [11111] event.token()");
//event.is_readable()
if event.is_readable() {
let mut buf=[0u8;4096];
let bytes = match fd1.read(&mut buf) {
Ok(n) => {
if n > 1 {
println!("[11111] read data len={}",n);
}
n
},
Err(e)=> {
if e.kind() == ErrorKind::WouldBlock {
0
}else{
println!("[1111111] read data fail={}",e); 0
}
}
};
thread::sleep(time::Duration::from_millis(1));
for i in 0 ..bytes {
let mut recvnum = receiver1.recv().unwrap();
// sender2.send(recvnum+1).unwrap() ;
let mut data1 = recvnum as Trans_Data ;
let tmpuser = data1.userid;
let mut tmpmsgindex = data1.msgindex ;
let msgcount = data1.msgindex ;
data1.msgindex += 1 ;
// data1.context=String::from("context=");
let mut tmpcontext = String::from("context=")+ &data1.userid.to_string() ;
tmpcontext += "+" ;
tmpcontext+= &data1.msgindex.to_string();
data1.context = tmpcontext;
// data1.context = "context="+
sender2.send(data1).unwrap() ;
fd1.write(&[1]).unwrap();
let tmpnum = rand::random::<u8>();
for j in 0..(tmpnum&0xF) {
let mut tmpcontext3 = String::from("context=")+ &tmpuser.to_string() ;
tmpcontext3 += "+" ;
tmpmsgindex += 1;
tmpcontext3 += &tmpmsgindex.to_string();
sender2.send(Trans_Data{
userid: tmpuser,
msgindex:tmpmsgindex,
context:tmpcontext3
}).unwrap() ;
}
// println!("[1111111] recv data={:?} send data={}", msgcount,tmpmsgindex);
}
}
reregister(& mut poll1,& mut fd1,Token(1));
}
}
// let mut recvnum = receiver1.recv().unwrap();
// println!("[1112222222] recv data={:?} send data={}", recvnum,recvnum+1);
}
});
{
// const WAITGEPMS: u32 = 1000;
let geptime = Some(Duration::from_millis(WAITGEPMS as u64));
let mut events = Events::with_capacity(4);
loop {
match poll2.poll(&mut events, geptime){
// Ok(num)=>{println!("[222222] num= {}",num);},
Ok(())=>{}, //println!("[22222]OK")
_=>{println!("[222222] num= ERR");}
}
for event in &events {
if event.token() == Token(1) {
// println!("recv [22222] event.token()");
if event.is_readable() {
let mut buf=[0u8;4096];
let bytes = match fd2.read(&mut buf) {
Ok(n) => {
if n > 1 {
println!("[222222] read data len={}", n);
}
n
// println!("[222222] read data len={}",n); n},
}
Err(e)=> {
if e.kind() == ErrorKind::WouldBlock {
0
} else {
println!("[222222] read data fail={}", e);
0
}
}
};
// println!("[222222] read data len={}",bytes);
thread::sleep(time::Duration::from_millis(1));
for i in 0 ..bytes {
let mut recvnum = receiver2.recv().unwrap();
let mut data1 = recvnum as Trans_Data ;
let tmpuser = data1.userid;
let mut tmpmsgindex = data1.msgindex ;
let msgcount = data1.msgindex ;
data1.msgindex += 1 ;
let mut tmpcontext = String::from("context=")+ &data1.userid.to_string() ;
tmpcontext += "+" ;
tmpcontext+= &data1.msgindex.to_string();
data1.context = tmpcontext;
sender1.send(data1).unwrap() ;
// recvnum += 1 ;
// sender1.send(recvnum+1).unwrap() ;
fd2.write(&[1]).unwrap();
let tmpnum = rand::random::<u8>();
for j in 0..(tmpnum&0x1F) {
let mut tmpcontext3 = String::from("context=")+ &tmpuser.to_string() ;
tmpcontext3 += "+" ;
tmpmsgindex += 1;
tmpcontext3 += &tmpmsgindex.to_string();
sender1.send(Trans_Data{
userid: tmpuser,
msgindex:tmpmsgindex,
context:tmpcontext3
}).unwrap() ;
}
// println!("[222222] recv data={:?} send data={}", msgcount,tmpmsgindex);
}
}
reregister(& mut poll2,& mut fd2,Token(1));
}
}
}
}
}
// #[cfg(target_os = "linux")]
// fn get_stream(sock:& std::net::TcpStream)->mio::net::TcpStream{
//
// }
// #[cfg(target_os = "windows")]
// fn get_stream(sock:& std::net::TcpStream)->mio::net::TcpStream{
// // unsafe { mio::net::TcpStream::FromRawSocket::from_raw_socket(*sock) }
// mio::net::TcpStream::connect("")
// }
pub fn register( poll: &mut Poll,sock:& std::net::TcpStream,token:Token) -> io::Result<(mio::net::TcpStream)> { //& mut
let mut stream = mio::net::TcpStream::from_std((*sock).try_clone()?);
// if cfg!(target_os = "linux") {
// println!("linux");
// } else {
// println!("not linux");
// }
match poll.registry().register(
&mut stream,
token,
Interest::READABLE | Interest::WRITABLE){
Ok(_)=>Ok(stream),
_=>unreachable!()
}
}
pub fn reregister(poll: &mut Poll, stream:& mut mio::net::TcpStream,token:Token) -> io::Result<()> {
//trace!("connection reregister; token={:?}", self.token);
poll.registry().reregister(
stream,
token,
Interest::READABLE | Interest::WRITABLE
).or_else(|e| {
//error!("Failed to reregister {:?}, {:?}", self.token, e);
Err(e)
})
}