什麼是Thread-Per-Message模式
Thread-Per-Message的意思是為每一個消息的處理開辟一個線程使得消息能夠以并發的方式進行處理,進而提高系統整體的吞吐能力。
這就好比電話接線員一樣,收到的每一個電話投訴或者業務處理請求,都會送出對應的工單,然後交由對應的從業人員來處理。
每個任務一個線程:
package MutilThreadModel.ThreadPerMessageModel;
/**
* Created by JYM on 2019/1/16
* 客戶送出的任何業務受理請求都會被封裝成Request對象
* */
public class Request
{
private final String business;
public Request(String business)
{
this.business = business;
}
@Override
public String toString()
{
return business;
}
}
客戶送出的任何業務受理請求都會被封裝成Request對象。
package MutilThreadModel.ThreadPerMessageModel;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.ThreadLocalRandom.current;
/**
* Created by JYM on 2019/1/16
* TaskHandler代表了每一個從業人員接收到任務後的處理邏輯
* */
/*
* TaskHandler用于處理每一個送出的Request請求,由于TaskHandler将被Thread執行,
* 是以需要實作Runnable接口
* */
public class TaskHandler implements Runnable
{
//需要處理的Request請求
private final Request request;
public TaskHandler(Request request)
{
this.request = request;
}
@Override
public void run() {
System.out.println("Begin handle "+request);
slowly();
System.out.println("End handle "+request);
}
//模拟請求處理比較耗時,使線程進入短暫的休眠階段
private void slowly()
{
try{
TimeUnit.SECONDS.sleep(current().nextInt(10));
}catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
TaskHandler代表了每一個從業人員接收到任務後的處理邏輯。
package MutilThreadModel.ThreadPerMessageModel;
/**
* Created by JYM on 2019/1/16
* */
public class Operator
{
public void call(String business)
{
//為每一個請求建立一個線程去處理
TaskHandler taskHandler = new TaskHandler(new Request(business));
new Thread(taskHandler).start();
}
}
/*
* Operator代表了接線員,當有電話打進來時,話務員會将客戶的請求封裝成一個工單Request,然後開辟一個線程(從業人員)去處理。
* 截止目前,我們完成了關于Thread-Per-Message的設計,但是這種設計方式存在着很嚴重的問題,經過第2章的學習,我們知道每一個JVM
* 中可建立的線程數量是有限的,針對每一個任務都建立一個新的線程,假如每一個線程執行的時間比較長,那麼在某個時刻JVM會由于無法在建立
* 新的線程而導緻棧記憶體的溢出;在假如每一個任務的執行時間都比較短,頻繁地建立銷毀線程對系統性能的開銷也是一個不小的影響。
* 這種處理方式雖然有很多問題,但不代表其就一無是處了,其實他也有自己的使用場景,比如在基于Event的程式設計模型中,當系統初始化時間發生時
* ,需要進行若幹資源的背景加載,由于系統初始化時的任務數量并不多,可以考慮使用該模式響應初始化Event,或者系統在關閉時,進行資源回收也可以
* 考慮将銷毀事件觸發的動作交給該模式。
* 我們可以将call方法中的建立新線程的方式交給線程池去處理,這樣可以避免線程頻繁建立和銷毀帶來的系統開銷,還能将線程數量控制在一個可控的範圍之内。
* */
package MutilThreadModel.ThreadPerMessageModel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created By JYM on 2019/1/16
* 重構Operator*/
public class Operator_re
{
//使用線程池替代為每一個請求建立線程
private final ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
public void call(String business)
{
TaskHandler taskHandler = new TaskHandler(new Request(business));
fixedThreadPool.execute(taskHandler);
}
}
多使用者的網絡聊天
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by JYM on 2019/1/16
* 多使用者的網絡聊天:
* Thread-Per-Message模式在網絡通信中的使用也是非常廣泛的,比如在本節中介紹的網絡聊天程式,
* 在服務端每一個連接配接到服務端的連接配接都将建立一個獨立的線程進行處理,當用戶端的連接配接數超過了服務端
* 的最大受理能力時,用戶端将被存放至排隊隊列中。
* */
/*
* 下面編寫服務端程式ChatServer用于接收來自用戶端的連結,并且與之進行TCP通信互動,
* 當服務端接收到了每一次的用戶端連接配接後便會給線程池送出一個任務用于與用戶端進行互動,進而提高并發響應能力。
* */
public class ChatServer
{
//服務端端口
private final int port;
//定義線程池,
private ExecutorService fixedThreadPool;
//服務端Socket
private ServerSocket serverSocket;
//通過構造函數傳入端口
public ChatServer(int port)
{
this.port = port;
}
//預設使用13312端口
public ChatServer()
{
this(13312);
}
public void startServer() throws IOException
{
//建立線程池
this.fixedThreadPool = Executors.newFixedThreadPool(10);
this.serverSocket = new ServerSocket(port);
this.serverSocket.setReuseAddress(true);
System.out.println("Chat server is started and listen at port: "+port);
}
private void listen() throws IOException
{
for (; ;)
{
//accept方法是阻塞方法,當有新的連結進入時才會傳回,并且傳回的是用戶端的連接配接
Socket client = serverSocket.accept();
//将用戶端連接配接作為一個Request封裝成對應的Handler然後送出給線程池
this.fixedThreadPool.execute(new ClientHandler(client));
}
}
}
在上面的程式中,當接收到了新的用戶端連接配接時,會為每一個用戶端連接配接建立一個線程ClientHandler與用戶端進行互動,當用戶端的連接配接個數超過線程池的最大數量時,用戶端雖然可以成功接入服務端,但是會進入阻塞隊列。
package MutilThreadModel.ThreadPerMessageModel;
import java.io.*;
import java.net.Socket;
/**
* Created by JYM on 2019/1/16
* 待伺服器端接收到用戶端的連接配接之後,便會建立一個新的ClientHandler任務送出給線程池,ClientHandler任務是
* Runnable接口的實作,主要負責和用戶端進行你來我往的簡單通信。
* */
//ClientHandler同樣也是一個Runnable接口的實作
public class ClientHandler implements Runnable
{
//用戶端的socket連接配接
private final Socket socket;
//用戶端的identity
private final String clientIdentify;
//通過構造函數傳入用戶端連接配接
public ClientHandler(final Socket socket)
{
this.socket = socket;
this.clientIdentify = socket.getInetAddress().getHostAddress()+":"+socket.getPort();
}
@Override
public void run() {
try{
this.chat();
}catch (IOException e)
{
e.printStackTrace();
}
}
private void chat() throws IOException
{
BufferedReader bufferedReader = wrap2Reader(this.socket.getInputStream());
PrintStream printStream = wrap2Print(this.socket.getOutputStream());
String received;
while ((received = bufferedReader.readLine()) != null)
{
//将用戶端發送的消息輸出到控制台
System.out.printf("client:%s-message:%s\n",clientIdentify,received);
if (received.equals("quit"))
{
//如果用戶端發送了quit指令,則斷開與用戶端的連接配接
write2Client(printStream,"client will close");
socket.close();
break;
}
//向用戶端發送消息
write2Client(printStream,"server"+received);
}
}
//将輸入位元組流封裝成BufferedReader緩沖字元流
private BufferedReader wrap2Reader(InputStream inputStream)
{
return new BufferedReader(new InputStreamReader(inputStream));
}
//将輸出位元組流封裝成PrintStream
private PrintStream wrap2Print(OutputStream outputStream)
{
return new PrintStream(outputStream);
}
//該方法主要用于向用戶端發送消息
private void write2Client(PrintStream print,String message)
{
print.println(message);
print.flush();
}
}
聊天程式測試:
package MutilThreadModel.ThreadPerMessageModel;
import java.io.IOException;
public class ChatTest
{
public static void main(String[] args) throws IOException
{
new ChatServer().startServer();
}
}