天天看点

springmvc+maven+netty-socketio服务端构建实时通信

1.maven引入依赖jar包

 <dependency>

          <groupId>com.corundumstudio.socketio</groupId>

          <artifactId>netty-socketio</artifactId>

          <version>1.7.7</version>

</dependency>

socketio服务端:SocketIo_Server.java

import java.util.Map;

import com.corundumstudio.socketio.AckRequest;

import com.corundumstudio.socketio.Configuration;

import com.corundumstudio.socketio.SocketIOClient;

import com.corundumstudio.socketio.SocketIOServer;

import com.corundumstudio.socketio.listener.ConnectListener;

import com.corundumstudio.socketio.listener.DataListener;

import com.corundumstudio.socketio.listener.DisconnectListener;

public class SocketIo_Server {

    public static void main(String[] args) throws InterruptedException {

        Configuration config = new Configuration();

        //服务器主机ip,这里配置本机

        config.setHostname("localhost");

        //端口,任意

        config.setPort(9092);

        config.setMaxFramePayloadLength(1024 * 1024);

        config.setMaxHttpContentLength(1024 * 1024);

        SocketIOServer server = new SocketIOServer(config);

        //监听广告推送事件,advert_info为事件名称,自定义

        server.addEventListener("advert_info", String.class, new DataListener<String>(){

            @Override

            public void onData(SocketIOClient client, String data, AckRequest ackRequest) throws ClassNotFoundException {

                //客户端推送advert_info事件时,onData接受数据,这里是string类型的json数据,还可以为Byte[],object其他类型

                String sa = client.getRemoteAddress().toString();

                String clientIp = sa.substring(1,sa.indexOf(":"));//获取客户端连接的ip

                Map params = client.getHandshakeData().getUrlParams();//获取客户端url参数

                System.out.println(clientIp+":客户端:************"+data);

            }

        });

        //监听通知事件

        server.addEventListener("notice_info", String.class, new DataListener<String>() {

            @Override    

            public void onData(SocketIOClient client, String data, AckRequest ackRequest) {

                //同上

        /**

         * 监听其他事件

         */

        //添加客户端连接事件

        server.addConnectListener(new ConnectListener() {

            @Override

            public void onConnect(SocketIOClient client) {

                // TODO Auto-generated method stub

                String sa = client.getRemoteAddress().toString();

                String clientIp = sa.substring(1,sa.indexOf(":"));//获取设备ip

                System.out.println(clientIp+"-------------------------"+"客户端已连接");

                Map params = client.getHandshakeData().getUrlParams();

                //给客户端发送消息

                client.sendEvent("advert_info",clientIp+"客户端你好,我是服务端,有什么能帮助你的?");

            }

        });

        //添加客户端断开连接事件

        server.addDisconnectListener(new DisconnectListener(){

            public void onDisconnect(SocketIOClient client) {

                System.out.println(clientIp+"-------------------------"+"客户端已断开连接");

                client.sendEvent("advert_info",clientIp+"客户端你好,我是服务端,期待下次和你见面");

          server.start();

        Thread.sleep(Integer.MAX_VALUE);

        server.stop();

    }

}

socketio客户端:SocketIo_Client.java

import io.socket.client.IO;

import io.socket.client.Socket;

import io.socket.emitter.Emitter;

public class SocketIo_Client {

    public static void main(String[] args) {

        try{

            IO.Options options = new IO.Options();    

            options.forceNew = true;

            options.reconnection = true;

            final Socket socket = IO.socket("http://localhost:9092?deviceId=ZYLPC", options);

            socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {

                @Override

                public void call(Object... args) {

                    System.out.println("connect");

//                    socket.close();

                }

            }).on(Socket.EVENT_CONNECT_TIMEOUT, new Emitter.Listener() {

                @Override 

                    System.out.println("connect timeout");

            }).on(Socket.EVENT_CONNECT_ERROR, new Emitter.Listener() {

                @Override    

                    System.out.println("connect error");

            }).on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {

                public void call(Object... args) {    

                    System.out.println("disconnect");

            }).on("advert_info", new Emitter.Listener() {

                @Override

                public void call(Object... args) {

                    String data = (String)args[0];

                    System.out.println("服务端:************"+data.toString());

                    //给服务端发送信息

                    socket.emit("advert_info", "服务端你好,我是客户端,我有问题想咨询你!");

                }

            }).on("notice_info", new Emitter.Listener(){

                public void call(Object... args){

            });

            socket.open();

        }catch(Exception e){

        }

与spring集成:

服务层:SocketIoService.java

import java.util.Collection;

import java.util.HashMap;

import java.util.List;

import org.springframework.stereotype.Service;

@Service("socketIoService")

public class SocketIoService {

    static SocketIOServer server;

    static Map<String, SocketIOClient> clientsMap = new HashMap<String, SocketIOClient>();

    public void startServer() throws InterruptedException{

        //服务器主机ip    

        //端口

        server = new SocketIOServer(config);

                //获取客户端连接的uuid参数

                Object object = params.get("uuid");

                String uuid = "";

                if(object != null){

                    uuid = ((List<String>)object).get(0);

                    //将uuid和连接客户端对象进行绑定

                    clientsMap.put(uuid,client);

                }

    public void stopServer(){

        if(server != null){

            server.stop();

            server = null;

    /**

     *  给所有连接客户端推送消息

     * @param eventType 推送的事件类型

     * @param message  推送的内容

     */

    public void sendMessageToAllClient(String eventType,String message){

        Collection<SocketIOClient> clients = server.getAllClients();

        for(SocketIOClient client: clients){

            client.sendEvent(eventType,message);

     * 给具体的客户端推送消息

     * @param deviceId 设备类型

     * @param eventType推送事件类型

     * @param message 推送的消息内容

    public void sendMessageToOneClient(String uuid,String eventType,String message){

        try {

            if(uuid != null && !"".equals(uuid)){

                SocketIOClient client = (SocketIOClient)clientsMap.get(uuid);

                if(client != null){

                    client.sendEvent(eventType,message);

        } catch (Exception e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

   public static SocketIOServer getServer() {

        return server;

控制层层:SocketIoController.java

import javax.servlet.http.HttpServletRequest;

import javax.servlet.http.HttpServletResponse;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.RequestMapping;

@Controller

public class SocketIoController {

    @Autowired

    private SocketIoService service;

    //启动socket 服务

    @RequestMapping("startServer")

    public void startServer(HttpServletRequest request,HttpServletResponse response) throws Exception{

        Map params = ReflectUtil.transToMAP(request.getParameterMap());

            if(service.getServer() == null){

                new Thread(new Runnable() {

                    @Override

                    public void run() {

                        // TODO Auto-generated method stub

                        try {

                            service.startServer();

                        } catch (InterruptedException e) {

                            // TODO Auto-generated catch block

                            e.printStackTrace();

                        }

                    }

                }).start();

    //停止socket服务

    @RequestMapping("stopServer")

    public void stopServer(HttpServletRequest request,HttpServletResponse response) throws Exception{

                service.stopServer();

    //给指定的客户端推送消息

    @RequestMapping("sendAdvertInfoMsg")

    public void sendAdvertInfoMsg(HttpServletRequest request,HttpServletResponse response) throws Exception{

        String uuid = ParamsUtil.nullDeal(params, "uuid", "");

            if(!"".equals(uuid) && service.getServer() != null){

                service.sendMessageToOneClient(uuid, "advert_info", "推送的内容");

如果想在spring容器启动之后启动sockerio,可以这样做:

自定义一个类,用@component注入

@component (把普通pojo实例化到spring容器中,相当于配置文件中的<bean id="" class=""/>      

实现spring  ApplicationListener接口,这样在spring加载成功之后就会调用onApplicationEvent方法启动socketio

import org.springframework.context.ApplicationListener;

import org.springframework.context.event.ContextRefreshedEvent;

import org.springframework.stereotype.Component;

import com.zkkj.backend.common.socketio.BinaryEventLauncher;

import com.zkkj.backend.service.biz.advert.IAdvertService;

/**

 * spring加载完毕后执行

 * @author ZYL_PC

 *

 */

@Component("BeanDefineConfigue")

public class BeanDefineConfigue  implements ApplicationListener<ContextRefreshedEvent>{

    @Autowired

    //当前服务器的ip

    private String serverIp = "";

    //当前服务器设备id

    private String deviceId = "";

    //执行时间,时间单位为毫秒,读者可自行设定,不得小于等于0

    private static Long cacheTime = Long.MAX_VALUE;

    //延迟时间,时间单位为毫秒,读者可自行设定,不得小于等于0

    private static Integer delay = 3000;

    @Override

    public void onApplicationEvent(ContextRefreshedEvent event) {

        // TODO Auto-generated method stub

        Timer timer = new Timer();

        timer.scheduleAtFixedRate(new TimerTask() {

          public void run() {

            //启动socket监听

              try{

                  if(service.getServer() == null){

                      new Thread(new Runnable() {

                          @Override

                          public void run() {

                              try {

                                  service.startServer();

                              } catch (InterruptedException e) {

                                  e.printStackTrace();

                              }

                          }

                      }).start();

                  }

              }catch(Exception e){

              }

          }

        }, delay,cacheTime);// 这里设定将延时每天固定执行