天天看點

基于netty的websocket服務

我們下面則将一些實際場景都添加進去,比如使用者身份的驗證,遊客隻能浏覽不能發言,多房間(頻道)的聊天。

該部落格非常适合 Java 新手,非常适合作為學習 Java 的切入點,不需要考慮tomcat、spring、mybatis等。

唯一的知識點就是 maven 的基礎使用。

├── WebSocketServer.java                啟動伺服器端口監聽

├── WebSocketServerInitializer.java     初始化服務

├── WebSocketServerHandler.java         接管WebSocket資料連接配接

├── dto

│   └── Response.java                   傳回給用戶端資料對象

├── entity

│   └── Client.java                     每個連接配接到WebSocket服務的用戶端對象

└── service

    ├── MessageService.java             完成發送消息

    └── RequestService.java             WebSocket初始化連接配接握手時的資料處理

功能設計概述

身份認證

用戶端将使用者 id 、進入的房間的 rid、使用者 token 

json_encode

,例如

{id:1;rid:21;token:'43606811c7305ccc6abb2be116579bfd'}

。然後在 

base64

 處理,通過參數

request

傳到伺服器,然後在伺服器做 id 和 token 的驗證(我的做法是 token 存放在redis string 5秒的過期時間)

房間表

使用一個Map 

channelGroupMap

 來存放各個房間(頻道),以用戶端傳握手時傳過來的 base64 字元串中擷取到定義的房間 ID,然後為該房間 ID 建立一個

ChannelGroup

ChannelGroup

 友善對該組内的所有用戶端廣播消息)

在 pom.xml 中引入netty 5

<dependencies>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>5.0.0.Alpha2</version>
    </dependency>
    <dependency>
        <groupId>com.jcraft</groupId>
        <artifactId>jzlib</artifactId>
        <version>1.1.2</version>
    </dependency>
    <dependency>
        <groupId>org.json</groupId>
        <artifactId>json</artifactId>
        <version>20141113</version>
    </dependency>
    <dependency>
        <groupId>commons-codec</groupId>
        <artifactId>commons-codec</artifactId>
        <version>1.10</version>
    </dependency>
</dependencies>
           

建立伺服器

這段代碼需要了解嗎?這是 netty 的套路,可以先記住 netty 的線程模型是一個 react 的一種變型,這裡有兩個nio線程組,一個是接受用戶端的請求,一個是worker組專門處理用戶端的請求。

可以簡單的了解下面的代碼就建構了一個nginx伺服器。是以不用管。

package net.mengkang;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.Channel;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.logging.LogLevel;

import io.netty.handler.logging.LoggingHandler;

public final class WebSocketServer {

    private static final int PORT = 8083;

    public static void main(String[] args) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);

        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap b = new ServerBootstrap();

            b.group(bossGroup, workerGroup)

                    .channel(NioServerSocketChannel.class)

                    .handler(new LoggingHandler(LogLevel.INFO))

                    .childHandler(new WebSocketServerInitializer());

            Channel ch = b.bind(PORT).sync().channel();

            ch.closeFuture().sync();

        } finally {

            bossGroup.shutdownGracefully();

            workerGroup.shutdownGracefully();

        }

    }

}

package net.mengkang;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelPipeline;

import io.netty.channel.socket.SocketChannel;

import io.netty.handler.codec.http.HttpObjectAggregator;

import io.netty.handler.codec.http.HttpServerCodec;

import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;

public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override

    public void initChannel(SocketChannel ch) throws Exception {

        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast(new HttpServerCodec());

        pipeline.addLast(new HttpObjectAggregator(65536));

        pipeline.addLast(new WebSocketServerCompressionHandler());

        pipeline.addLast(new WebSocketServerHandler());

    }

}

處理長連接配接

下面程式中最的處理在握手階段

handleHttpRequest

,裡面處理參數的判斷,使用者的認證,登入使用者表的維護,直播房間表維護。詳細的請大家對照代碼來浏覽。

握手完成之後的消息傳遞則在

handleWebSocketFrame

中處理。

整理的執行流程,大家可以對各個方法打斷點予以調試,就會很清楚整個執行的脈絡啦。

package net.mengkang;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.*;

import io.netty.channel.group.ChannelGroup;

import io.netty.channel.group.DefaultChannelGroup;

import io.netty.handler.codec.http.*;

import io.netty.handler.codec.http.websocketx.*;

import io.netty.util.CharsetUtil;

import io.netty.util.concurrent.GlobalEventExecutor;

import net.mengkang.dto.Response;

import net.mengkang.entity.Client;

import net.mengkang.service.MessageService;

import net.mengkang.service.RequestService;

import org.json.JSONObject;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import static io.netty.handler.codec.http.HttpHeaderNames.HOST;

import static io.netty.handler.codec.http.HttpMethod.GET;

import static io.netty.handler.codec.http.HttpResponseStatus.*;

import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {

    // websocket 服務的 uri

    private static final String WEBSOCKET_PATH = "/websocket";

    // 一個 ChannelGroup 代表一個直播頻道

    private static Map<Integer, ChannelGroup> channelGroupMap = new ConcurrentHashMap <>();

    // 本次請求的 code

    private static final String HTTP_REQUEST_STRING = "request";

    private Client client = null;

    private WebSocketServerHandshaker handshaker;

    @Override

    public void messageReceived(ChannelHandlerContext ctx, Object msg) {

        if (msg instanceof FullHttpRequest) {

            handleHttpRequest(ctx, (FullHttpRequest) msg);

        } else if (msg instanceof WebSocketFrame) {

            handleWebSocketFrame(ctx, (WebSocketFrame) msg);

        }

    }

    @Override

    public void channelReadComplete(ChannelHandlerContext ctx) {

        ctx.flush();

    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {

        // Handle a bad request.

        if (!req.decoderResult().isSuccess()) {

            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));

            return;

        }

        // Allow only GET methods.

        if (req.method() != GET) {

            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));

            return;

        }

        if ("/favicon.ico".equals(req.uri()) || ("/".equals(req.uri()))) {

            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND));

            return;

        }

        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());

        Map<String, List<String>> parameters = queryStringDecoder.parameters();

        if (parameters.size() == 0 || !parameters.containsKey(HTTP_REQUEST_STRING)) {

            System.err.printf(HTTP_REQUEST_STRING + "參數不可預設");

            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND));

            return;

        }

        client = RequestService.clientRegister(parameters.get(HTTP_REQUEST_STRING).get(0));

        if (client.getRoomId() == 0) {

            System.err.printf("房間号不可預設");

            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND));

            return;

        }

        // 房間清單中如果不存在則為該頻道,則新增一個頻道 ChannelGroup

        if (!channelGroupMap.containsKey(client.getRoomId())) {

            channelGroupMap.put(client.getRoomId(), new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));

        }

        // 确定有房間号,才将用戶端加入到頻道中

        channelGroupMap.get(client.getRoomId()).add(ctx.channel());

        // Handshake

        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req), null, true);

        handshaker = wsFactory.newHandshaker(req);

        if (handshaker == null) {

            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());

        } else {

            ChannelFuture channelFuture = handshaker.handshake(ctx.channel(), req);

            // 握手成功之後,業務邏輯

            if (channelFuture.isSuccess()) {

                if (client.getId() == 0) {

                    System.out.println(ctx.channel() + " 遊客");

                    return;

                }

            }

        }

    }

    private void broadcast(ChannelHandlerContext ctx, WebSocketFrame frame) {

        if (client.getId() == 0) {

            Response response = new Response(1001, "沒登入不能聊天哦");

            String msg = new JSONObject(response).toString();

            ctx.channel().write(new TextWebSocketFrame(msg));

            return;

        }

        String request = ((TextWebSocketFrame) frame).text();

        System.out.println(" 收到 " + ctx.channel() + request);

        Response response = MessageService.sendMessage(client, request);

        String msg = new JSONObject(response).toString();

        if (channelGroupMap.containsKey(client.getRoomId())) {

            channelGroupMap.get(client.getRoomId()).writeAndFlush(new TextWebSocketFrame(msg));

        }

    }

    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

        if (frame instanceof CloseWebSocketFrame) {

            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());

            return;

        }

        if (frame instanceof PingWebSocketFrame) {

            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));

            return;

        }

        if (!(frame instanceof TextWebSocketFrame)) {

            throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));

        }

        broadcast(ctx, frame);

    }

    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {

        if (res.status().code() != 200) {

            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);

            res.content().writeBytes(buf);

            buf.release();

            HttpHeaderUtil.setContentLength(res, res.content().readableBytes());

        }

        ChannelFuture f = ctx.channel().writeAndFlush(res);

        if (!HttpHeaderUtil.isKeepAlive(req) || res.status().code() != 200) {

            f.addListener(ChannelFutureListener.CLOSE);

        }

    }

    @Override

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

        cause.printStackTrace();

        ctx.close();

    }

    @Override

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

        Channel incoming = ctx.channel();

        System.out.println("收到" + incoming.remoteAddress() + " 握手請求");

    }

    @Override

    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

        if (client != null && channelGroupMap.containsKey(client.getRoomId())) {

            channelGroupMap.get(client.getRoomId()).remove(ctx.channel());

        }

    }

    private static String getWebSocketLocation(FullHttpRequest req) {

        String location = req.headers().get(HOST) + WEBSOCKET_PATH;

        return "ws://" + location;

    }

}

伺服器端就寫完啦,還有一些用戶端對象的構想驗證什麼的就不一一細說了,都很簡單,都在代碼裡。下面是用戶端。

<html>

<head><title></title></head>

<body>

<script type="text/javascript">

var socket;

if (!window.WebSocket) {

  window.WebSocket = window.MozWebSocket;

}

if (window.WebSocket) {

  socket = new WebSocket("ws://localhost:8083/websocket/?request=e2lkOjE7cmlkOjI2O3Rva2VuOiI0MzYwNjgxMWM3MzA1Y2NjNmFiYjJiZTExNjU3OWJmZCJ9");

  socket.onmessage = function(event) {

      console.log(event.data);

  };

  socket.onopen = function(event) {

    console.log("websocket 打開了");

  };

  socket.onclose = function(event) {

    console.log("websocket 關閉了");

  };

}

function send(message) {

  if (!window.WebSocket) { return; }

  if (socket.readyState == WebSocket.OPEN) {

    socket.send(message);

  } else {

    alert("The socket is not open.");

  }

}

</script>

<form οnsubmit="return false;">

  <input type="text" name="message" value="Hello, World!"/>

  <input type="button" value="Send Web Socket Data" οnclick="send(this.form.message.value)" />

</form>

</body>

</html>