天天看点

Socket实现聊天室(二)

午安,打工人!

最近终于终于有点闲空了,就翻了翻之前写的学习笔记,就发现了上一篇文章。依稀记得当时还是在学校里费了不少功夫才完成的,就突然想拿出来重新回顾一下顺便改进改进,如若您发现有错误或者不合理的地方,恳请指点一二,在此感谢!

前言

这一类文章暂时没有提供可视化界面,我会在基础功能差不多具备的情况下进行设计和开发。

本篇文章思路可以参考上一篇,主要是对代码进行修改,总体还是不难的。

目录

Socket实现聊天室(二)

common : 设计主要是用来存储用户信息的以及后期一些公共类。

socketclient:客户端

websocket: 服务端

common模块

目前就存放一个用户,大伙就随便看看就行。

@Data
public class User implements Serializable {
        private String username;
        private String message;
        private String ip;

        public User(String username,String message,String ip){
                this.username = username;
                this.message = message;
                this.ip = ip;
        }
        public User(){

        }
}
           

websocket

1、CoreServer

CoreServer 服务端主要实现处理,在这里进行线程的启动以及初始化,基本实现思路大体一致:

1.开启阻塞连接后,等待连接。

2.每当一个客户端进行连接,就创建一个线程来单独对此客户端进行交互,同时在相互连接成功时将线程信息存放到一个集合中,用于后期消息发送以及退出连接等操作。

3.广播消息线程:该线程一开始是存放于上述线程步骤中的,然后发现每一个连接都会创建一个该线程,虽然功能目前不受影响但是总感觉不合理,所以就将该线程放在此处,因为博主觉得该广播消息处理器应该只能有一个也就是单例的,如有不合理还请指出!

public class CoreServer {

    //广播消息发送线程,该处理器只能有一个,应该设计成单例
    private static final SendMessage sendActive = new SendMessage();
    private ServerSocket server;
    //用于存放当前连接的客户端信息。
    public static List<ProcessorHandler> connectThread = new ArrayList<>();

    CoreServer() throws IOException {
        server = new ServerSocket(9999);

    }

    //初始化服务器
    @SneakyThrows
    public void start() {
        System.err.println("正在启动加载服务器.....");
        //启动定时器,按时刷新在线人数
//        timer();
        //启动 发送广播消息线程
        sendActive.start();
        System.err.println("服务器启动成功!");
        while (true) {
            Socket socket = server.accept(); //等待连接
            //每一个socket交给一个processorHandler处理,用来处理客户端发来的消息
            ProcessorHandler processorHandler = new ProcessorHandler(socket);
            processorHandler.start();

        }
    }

    //    @Scheduled(cron = "0 */1 * * * ?")
//    private void timer() {
//        new Timer().schedule(new TimerTask() {
//            @Override
//            public void run() {
//                try {
//                    //do Something
//                    System.err.println("系统消息:当前在线人数 ["+connectThread.size()+"] 人!");
//                } catch (Exception e) {
//                    e.printStackTrace();
//                }
//            }
//        },0,1000*5);
//
//    }

    //在客户端连接和退出时进行提醒
    public static void onLineClient() {
        System.err.println("系统消息:当前在线人数 [" + connectThread.size() + "] 人!");
    }


    public static void main(String[] args) throws IOException {
        CoreServer coreServer = new CoreServer();
        coreServer.start();
    }
}
           
2、SendMessage 广播消息线程

这块就比较简单,就是进行数据传输,大伙依旧看看就好

package com.chenxh.socket.handler;

import com.chenxh.socket.server.CoreServer;

import java.io.BufferedWriter;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Scanner;

public class SendMessage extends Thread {
    @Override
    public void run() {
        Scanner scan = new Scanner(System.in);
        while (true) {
            String line = scan.nextLine();
            //将服务器消息分发给每个客户端
            CoreServer.connectThread.forEach(item -> {
                sendActive(item.getSocket(), line);
            });
        }
    }

    public static void sendActive(Socket socket, String str) {
        /*
         * 串联一个流连接,提升效率
         */
        try {
            if (!socket.isOutputShutdown()) {
                OutputStream out = socket.getOutputStream();
                OutputStreamWriter osw = new OutputStreamWriter(out, "UTF-8");
                BufferedWriter bw = new BufferedWriter(osw);
                //pw.print以及 bw.writer 都可以。两个选一个就行
//                PrintWriter pw = new PrintWriter(bw, true);//此处注意参数true,若不添加此方法我们需要频繁调用flush方法来保证消息的及时性。
//                pw.println(str);
                bw.write(str);
                bw.newLine();
                bw.flush();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

           
3、与客户端交互线程

该线程主要用于接收和处理客户端发来的指令,目前程序控制的都是通过Scanner进行数据动态传输。

这块也不难,代码都有注释。

主要功能:

1.当客户端进行连接时会提示用户登录并显示在线人数

2.客户端进行键盘输入,在服务器控制台进行查看。

3.当客户端输入"end"时退出连接,并终止数据传输以及清除该连接数据。

package com.chenxh.socket.handler;


import com.chenxh.entity.User;
import com.chenxh.socket.server.CoreServer;
import lombok.Getter;
import org.apache.commons.lang3.ObjectUtils;

import java.io.*;
import java.net.Socket;

public class ProcessorHandler extends Thread {

    private final ProcessorHandler processorHandler = this;
    @Getter
    private Socket socket;
    private User user;

    public ProcessorHandler(Socket socket) throws IOException, ClassNotFoundException {
        this.socket = socket;
        //在第一次连接时会调用该方法,并且第一连接时客户端传来的对象流我们再次进行读取以及存储
        user = (User) new ObjectInputStream(socket.getInputStream()).readObject();
        if (ObjectUtils.isEmpty(user)) {
            throw new RuntimeException("deserializer exception please check params");
        }
        System.err.println("系统提示:" + "[" + user.getIp() + "]" + user.getUsername() + " 进入聊天室");
        //将连接信息添加到集合中
        CoreServer.connectThread.add(processorHandler);
        CoreServer.onLineClient();

    }

    @Override
    public void run() {
        //接收处理客户发送来的消息线程
        new ReceiveMessage().start();
    }

    //退出连接操作
    public void exitServer() throws IOException {
        //清楚存储的连接信息
        CoreServer.connectThread.remove(processorHandler);
        //停止对该socket的消息接收和发送。
        socket.shutdownOutput();
        socket.shutdownInput();
        System.err.println("用户:" + user.getUsername() + " 退出聊天室!");
        //显示在线人数
        CoreServer.onLineClient();
    }

    class ReceiveMessage extends Thread {
        @Override
        public void run() {
            try {
                InputStream in = socket.getInputStream();
                InputStreamReader isr = new InputStreamReader(in, "utf-8");
                BufferedReader br = new BufferedReader(isr);
                String str = null;
                while ((str = br.readLine()) != null) {
                    if ("end".equals(str)) { //当客户端输入 end 认为客户端退出连接
                        exitServer();
                    } else {
                        System.out.println("[" + user.getIp() + "]" + user.getUsername() + "说:" + str);
                    }
                }
            } catch (Exception e) {
                //用于处理非正常退出操作(end)
                try {
                    exitServer();
                } catch (IOException ex) {
                    System.err.println("系统异常:用户退出异常!");
                }
            }
        }
    }
}

           

socketclient

客户端

1、Client

启动类

建立连接,并交由线程单独处理。

public class Client{


    private Socket socket;
    //缓存线程池
    ExecutorService executorService = Executors.newCachedThreadPool();

   public void connect() throws IOException {
       socket = new Socket("127.0.0.1",9999);
       User user = new User();
       user.setUsername("Ccc");
       executorService.execute(new UserThread(socket,user));




   }


    public static void main(String[] args) throws IOException {
        Client client = new Client();
        client.connect();
    }

}
           
2、UserThread

用户线程

用于处理将消息发送到客户端以及收客户端发送来的广播消息。

package com.chenxh.socket.handler;

import com.chenxh.entity.User;
import lombok.SneakyThrows;

import java.io.*;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

public class UserThread implements Runnable {
    private User user;
    private Socket socket;

    public UserThread(Socket socket, User user) throws IOException {
        this.socket = socket;
        this.user = user;
        //第一次连接时将登陆进行传到服务端。
        new ObjectOutputStream(socket.getOutputStream()).writeObject(new User("Ccc", "", socket.getInetAddress().getHostAddress()));
    }


    @SneakyThrows
    @Override
    public void run() {
        //发送到服务器
        new SendMessage().start();
        //接收广播消息
        new ReceiveMessage().start();
    }
    //发送线程
    class SendMessage extends Thread{
        @Override
        public void run() {
            try {
                /*
                 * 串联一个流连接,提升效率
                 */
                OutputStream out = socket.getOutputStream();
                OutputStreamWriter osw = new OutputStreamWriter(out, "UTF-8");
                BufferedWriter bw = new BufferedWriter(osw);
                PrintWriter pw = new PrintWriter(bw, true);//此处注意参数true,若不添加此方法我们需要频繁调用flush方法来保证消息的及时性。
                Scanner scan = new Scanner(System.in);
                while (true) {
                    String line = scan.nextLine();
                    bw.write(line);
                    bw.newLine();
                    bw.flush();
                }
            } catch (Exception e) {
                System.err.println("消息提示:您已从服务器断开连接!");
            }
        }
    }
    //接收广播消息线程
    class ReceiveMessage extends Thread{
        @Override
        public void run() {
            try {

                InputStream in = socket.getInputStream();
                InputStreamReader isr = new InputStreamReader(in, "utf-8");
                BufferedReader br = new BufferedReader(isr);
                String str = null;
                while ((str = br.readLine()) != null) {
                    System.out.println("[广播消息]:"+ str);

                }
            } catch (Exception e) {
                System.err.println("消息提示:您已从服务器断开连接!");
            }
        }
    }
}

           

代码效果实例:

服务器启动连接:

Socket实现聊天室(二)

消息发送:

Socket实现聊天室(二)
Socket实现聊天室(二)
Socket实现聊天室(二)

广播消息

Socket实现聊天室(二)
Socket实现聊天室(二)

正常退出:

Socket实现聊天室(二)
Socket实现聊天室(二)

点击终止异常退出

Socket实现聊天室(二)
Socket实现聊天室(二)

服务器异常:

Socket实现聊天室(二)
Socket实现聊天室(二)

没法上传视频 , – !大伙就将就看看。若发现不正确不合理的地方,还请能指点一二!