午安,打勞工!
最近終于終于有點閑空了,就翻了翻之前寫的學習筆記,就發現了上一篇文章。依稀記得當時還是在學校裡費了不少功夫才完成的,就突然想拿出來重新回顧一下順便改進改進,如若您發現有錯誤或者不合理的地方,懇請指點一二,在此感謝!
前言
這一類文章暫時沒有提供可視化界面,我會在基礎功能差不多具備的情況下進行設計和開發。
本篇文章思路可以參考上一篇,主要是對代碼進行修改,總體還是不難的。
目錄
common : 設計主要是用來存儲使用者資訊的以及後期一些公共類。
socketclient:用戶端
websocket: 服務端
common子產品
目前就存放一個使用者,大夥就随便看看就行。
@Data
public class User implements Serializable {
private String username;
private String message;
private String ip;
public User(String username,String message,String ip){
this.username = username;
this.message = message;
this.ip = ip;
}
public User(){
}
}
websocket
1、CoreServer
CoreServer 服務端主要實作處理,在這裡進行線程的啟動以及初始化,基本實作思路大體一緻:
1.開啟阻塞連接配接後,等待連接配接。
2.每當一個用戶端進行連接配接,就建立一個線程來單獨對此用戶端進行互動,同時在互相連接配接成功時将線程資訊存放到一個集合中,用于後期消息發送以及退出連接配接等操作。
3.廣播消息線程:該線程一開始是存放于上述線程步驟中的,然後發現每一個連接配接都會建立一個該線程,雖然功能目前不受影響但是總感覺不合理,是以就将該線程放在此處,因為部落客覺得該廣播消息處理器應該隻能有一個也就是單例的,如有不合理還請指出!
public class CoreServer {
//廣播消息發送線程,該處理器隻能有一個,應該設計成單例
private static final SendMessage sendActive = new SendMessage();
private ServerSocket server;
//用于存放目前連接配接的用戶端資訊。
public static List<ProcessorHandler> connectThread = new ArrayList<>();
CoreServer() throws IOException {
server = new ServerSocket(9999);
}
//初始化伺服器
@SneakyThrows
public void start() {
System.err.println("正在啟動加載伺服器.....");
//啟動定時器,按時重新整理線上人數
// timer();
//啟動 發送廣播消息線程
sendActive.start();
System.err.println("伺服器啟動成功!");
while (true) {
Socket socket = server.accept(); //等待連接配接
//每一個socket交給一個processorHandler處理,用來處理用戶端發來的消息
ProcessorHandler processorHandler = new ProcessorHandler(socket);
processorHandler.start();
}
}
// @Scheduled(cron = "0 */1 * * * ?")
// private void timer() {
// new Timer().schedule(new TimerTask() {
// @Override
// public void run() {
// try {
// //do Something
// System.err.println("系統消息:目前線上人數 ["+connectThread.size()+"] 人!");
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
// },0,1000*5);
//
// }
//在用戶端連接配接和退出時進行提醒
public static void onLineClient() {
System.err.println("系統消息:目前線上人數 [" + connectThread.size() + "] 人!");
}
public static void main(String[] args) throws IOException {
CoreServer coreServer = new CoreServer();
coreServer.start();
}
}
2、SendMessage 廣播消息線程
這塊就比較簡單,就是進行資料傳輸,大夥依舊看看就好
package com.chenxh.socket.handler;
import com.chenxh.socket.server.CoreServer;
import java.io.BufferedWriter;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Scanner;
public class SendMessage extends Thread {
@Override
public void run() {
Scanner scan = new Scanner(System.in);
while (true) {
String line = scan.nextLine();
//将伺服器消息分發給每個用戶端
CoreServer.connectThread.forEach(item -> {
sendActive(item.getSocket(), line);
});
}
}
public static void sendActive(Socket socket, String str) {
/*
* 串聯一個流連接配接,提升效率
*/
try {
if (!socket.isOutputShutdown()) {
OutputStream out = socket.getOutputStream();
OutputStreamWriter osw = new OutputStreamWriter(out, "UTF-8");
BufferedWriter bw = new BufferedWriter(osw);
//pw.print以及 bw.writer 都可以。兩個選一個就行
// PrintWriter pw = new PrintWriter(bw, true);//此處注意參數true,若不添加此方法我們需要頻繁調用flush方法來保證消息的及時性。
// pw.println(str);
bw.write(str);
bw.newLine();
bw.flush();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
3、與用戶端互動線程
該線程主要用于接收和處理用戶端發來的指令,目前程式控制的都是通過Scanner進行資料動态傳輸。
這塊也不難,代碼都有注釋。
主要功能:
1.當用戶端進行連接配接時會提示使用者登入并顯示線上人數
2.用戶端進行鍵盤輸入,在伺服器控制台進行檢視。
3.當用戶端輸入"end"時退出連接配接,并終止資料傳輸以及清除該連接配接資料。
package com.chenxh.socket.handler;
import com.chenxh.entity.User;
import com.chenxh.socket.server.CoreServer;
import lombok.Getter;
import org.apache.commons.lang3.ObjectUtils;
import java.io.*;
import java.net.Socket;
public class ProcessorHandler extends Thread {
private final ProcessorHandler processorHandler = this;
@Getter
private Socket socket;
private User user;
public ProcessorHandler(Socket socket) throws IOException, ClassNotFoundException {
this.socket = socket;
//在第一次連接配接時會調用該方法,并且第一連接配接時用戶端傳來的對象流我們再次進行讀取以及存儲
user = (User) new ObjectInputStream(socket.getInputStream()).readObject();
if (ObjectUtils.isEmpty(user)) {
throw new RuntimeException("deserializer exception please check params");
}
System.err.println("系統提示:" + "[" + user.getIp() + "]" + user.getUsername() + " 進入聊天室");
//将連接配接資訊添加到集合中
CoreServer.connectThread.add(processorHandler);
CoreServer.onLineClient();
}
@Override
public void run() {
//接收處理客戶發送來的消息線程
new ReceiveMessage().start();
}
//退出連接配接操作
public void exitServer() throws IOException {
//清楚存儲的連接配接資訊
CoreServer.connectThread.remove(processorHandler);
//停止對該socket的消息接收和發送。
socket.shutdownOutput();
socket.shutdownInput();
System.err.println("使用者:" + user.getUsername() + " 退出聊天室!");
//顯示線上人數
CoreServer.onLineClient();
}
class ReceiveMessage extends Thread {
@Override
public void run() {
try {
InputStream in = socket.getInputStream();
InputStreamReader isr = new InputStreamReader(in, "utf-8");
BufferedReader br = new BufferedReader(isr);
String str = null;
while ((str = br.readLine()) != null) {
if ("end".equals(str)) { //當用戶端輸入 end 認為用戶端退出連接配接
exitServer();
} else {
System.out.println("[" + user.getIp() + "]" + user.getUsername() + "說:" + str);
}
}
} catch (Exception e) {
//用于處理非正常退出操作(end)
try {
exitServer();
} catch (IOException ex) {
System.err.println("系統異常:使用者退出異常!");
}
}
}
}
}
socketclient
用戶端
1、Client
啟動類
建立連接配接,并交由線程單獨處理。
public class Client{
private Socket socket;
//緩存線程池
ExecutorService executorService = Executors.newCachedThreadPool();
public void connect() throws IOException {
socket = new Socket("127.0.0.1",9999);
User user = new User();
user.setUsername("Ccc");
executorService.execute(new UserThread(socket,user));
}
public static void main(String[] args) throws IOException {
Client client = new Client();
client.connect();
}
}
2、UserThread
使用者線程
用于處理将消息發送到用戶端以及收用戶端發送來的廣播消息。
package com.chenxh.socket.handler;
import com.chenxh.entity.User;
import lombok.SneakyThrows;
import java.io.*;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
public class UserThread implements Runnable {
private User user;
private Socket socket;
public UserThread(Socket socket, User user) throws IOException {
this.socket = socket;
this.user = user;
//第一次連接配接時将登陸進行傳到服務端。
new ObjectOutputStream(socket.getOutputStream()).writeObject(new User("Ccc", "", socket.getInetAddress().getHostAddress()));
}
@SneakyThrows
@Override
public void run() {
//發送到伺服器
new SendMessage().start();
//接收廣播消息
new ReceiveMessage().start();
}
//發送線程
class SendMessage extends Thread{
@Override
public void run() {
try {
/*
* 串聯一個流連接配接,提升效率
*/
OutputStream out = socket.getOutputStream();
OutputStreamWriter osw = new OutputStreamWriter(out, "UTF-8");
BufferedWriter bw = new BufferedWriter(osw);
PrintWriter pw = new PrintWriter(bw, true);//此處注意參數true,若不添加此方法我們需要頻繁調用flush方法來保證消息的及時性。
Scanner scan = new Scanner(System.in);
while (true) {
String line = scan.nextLine();
bw.write(line);
bw.newLine();
bw.flush();
}
} catch (Exception e) {
System.err.println("消息提示:您已從伺服器斷開連接配接!");
}
}
}
//接收廣播消息線程
class ReceiveMessage extends Thread{
@Override
public void run() {
try {
InputStream in = socket.getInputStream();
InputStreamReader isr = new InputStreamReader(in, "utf-8");
BufferedReader br = new BufferedReader(isr);
String str = null;
while ((str = br.readLine()) != null) {
System.out.println("[廣播消息]:"+ str);
}
} catch (Exception e) {
System.err.println("消息提示:您已從伺服器斷開連接配接!");
}
}
}
}
代碼效果執行個體:
伺服器啟動連接配接:
消息發送:
廣播消息
正常退出:
點選終止異常退出
伺服器異常:
沒法上傳視訊 , – !大夥就将就看看。若發現不正确不合理的地方,還請能指點一二!