天天看點

Netty 實作 UDP 通訊一、UDP 基礎知識二、程式實作幾個小問題小結

一、UDP 基礎知識

UDP 是User Datagram Protocol 的簡稱, 翻譯為使用者資料報協定。

UDP 是一種無連接配接的傳輸協定,應用程式無需建立連接配接就可以發送資料報。

UDP 有三種通訊方式:單點傳播、多點傳播、廣播。

1. 通訊方式

  • 單點傳播 通過指定通訊主機 IP 和端口, 可以實作将消息發送到指定主機。
  • 多點傳播 資料收發僅在指定分組中進行,其他未加入分組的主機不能收發對應的資料。
  • 廣播 将消息發送到同一廣播網絡中每個主機。

2. UDP 位址

UDP 采用的也是類似于 IP 一樣的位址,無需在作業系統中設定。隻需要在應用程式中使用,且與 網卡 IP 位址不沖突。

廣播位址

UDP 使用的廣播位址為:255.255.255.255, 注意:本地廣播資訊不會被路由器轉發。

多點傳播位址

D 類位址用于多點傳播,D 類位址範圍為 224.0.0.0 ~ 239.255.255.255,這些位址又劃分為以下 4 類:

位址 說明
224.0.0.0~224.0.0.255 為預留的多點傳播位址(永久組位址),位址224.0.0.0保留不做配置設定,其它位址供路由協定使用
224.0.1.0~224.0.1.255 是公用多點傳播位址,可以用于 Internet;欲使用需申請
224.0.2.0~238.255.255.255 為使用者可用的多點傳播位址(臨時組位址),全網範圍内有效
239.0.0.0~239.255.255.255 為本地管理多點傳播位址,僅在特定的本地範圍内有效

3. UDP 資料報

資料報長度

包含報頭在内的資料報的最大長度為 64 K, 一些實際應用往往會限制資料報的大小,有時會降低到 8192 位元組。UDP 資訊包的标題很短,隻有 8 個位元組,相對于 TCP 的 20 個位元組資訊包而言,UDP 的額外開銷很小。

資料報特征

UDP 封包沒有可靠性保證、順序保證和流量控制字段等,可靠性較差。

二、程式實作

服務端

import io.netty.bootstrap.Bootstrap
import io.netty.channel.ChannelFactory
import io.netty.channel.ChannelInitializer
import io.netty.channel.EventLoopGroup
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.InternetProtocolFamily
import io.netty.channel.socket.nio.NioDatagramChannel
import io.netty.util.NetUtil
import udp.UdpChannelInboundHandler

public class Server {

    static void main(String[] args) {

        Server server = new Server()

        InetSocketAddress  address = new InetSocketAddress("239.8.8.1", 51888)

        server.run(address)
    }


    void run(InetSocketAddress groupAddress) {

        EventLoopGroup group = new NioEventLoopGroup()

        try {

            Bootstrap b = new Bootstrap()

            b.group(group)
                    .channelFactory(new ChannelFactory<NioDatagramChannel>() {
                        @Override
                        NioDatagramChannel newChannel() {
                            return new NioDatagramChannel(InternetProtocolFamily.IPv4)
                        }
                    })
                    .handler(new ChannelInitializer<NioDatagramChannel>() {
                        @Override
                        void initChannel(NioDatagramChannel ch) throws Exception {
                            ch.pipeline().addLast(new UdpChannelInboundHandler(true))
                        }
                    })

            NioDatagramChannel ch = (NioDatagramChannel) b.bind(groupAddress.getPort()).sync().channel()


            NetworkInterface ni = NetUtil.LOOPBACK_IF

            println "$ni.name : $ni.displayName"

            ch.joinGroup(groupAddress, ni).sync()

            println "udp server($groupAddress.hostName:$groupAddress.port) is running..."

            ch.closeFuture().await()

        } catch (InterruptedException e) {
            e.printStackTrace()
        } catch (Exception e) {
            e.printStackTrace()
        } finally {
            group.shutdownGracefully()
        }
    }
}
           

運作結果:

udp server(239.8.8.8:51888) is started…

recv: msg 0

recv: msg 1

recv: msg 2

recv: msg 3

recv: msg 4

用戶端

package udp

import io.netty.bootstrap.Bootstrap
import io.netty.buffer.ByteBuf
import io.netty.buffer.UnpooledByteBufAllocator
import io.netty.channel.Channel
import io.netty.channel.ChannelInitializer
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.DatagramPacket
import io.netty.channel.socket.nio.NioDatagramChannel
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import io.netty.util.CharsetUtil

class UdpClient {

    Channel channel

    /**
     * 點對點
     */
    InetSocketAddress remoteAddress = new InetSocketAddress("localhost", 51888)

    /** 廣播位址
     InetSocketAddress remoteAddress = new InetSocketAddress("255.255.255.255", 9000)
     */

    /** 多點傳播位址
     InetSocketAddress remoteAddress = new InetSocketAddress("239.8.8.1", 9000)
     */

    void startClient() {

        NioEventLoopGroup group = new NioEventLoopGroup()

        Bootstrap b = new Bootstrap()
        b.group(group)
                .channel(NioDatagramChannel.class)
                .handler(new ChannelInitializer() {
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline()
                                .addLast("recv", new UdpChannelInboundHandler())
                    }
                })
        channel = (NioDatagramChannel) b.bind(8888).sync().channel()
    }

    void sendMsg(String msg) {

        ByteBuf buf = new UnpooledByteBufAllocator(true).buffer()
        buf.writeCharSequence(msg, CharsetUtil.UTF_8)

        def packet = new DatagramPacket(buf, remoteAddress)

        channel.writeAndFlush(packet).sync()
    }

    static void main(String[] args) {

        UdpClient client = new UdpClient()

        client.startClient()

        for (int i = 0; i < 5; i++) {
            def msg = "msg $i"
            println "send msg: $msg"
            client.sendMsg(msg)
        }

        println "send finish.."
    }
}

           

運作結果:

send msg: msg 0

recv: ok

send msg: msg 1

recv: ok

send msg: msg 2

recv: ok

send msg: msg 3

recv: ok

send msg: msg 4

recv: ok

send finish…

編解碼器

package udp

import io.netty.buffer.ByteBuf
import io.netty.buffer.UnpooledByteBufAllocator
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.channel.socket.DatagramPacket
import io.netty.util.CharsetUtil

class UdpChannelInboundHandler extends SimpleChannelInboundHandler<DatagramPacket> {

    boolean rep

    UdpChannelInboundHandler(boolean rep = false) {
        this.rep = rep
    }

    protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {

        def buf = msg.content()
        String strMsg = buf.toString(CharsetUtil.UTF_8)
        println "recv: $strMsg"


        if(rep) {
            ByteBuf buf1 = new UnpooledByteBufAllocator(true).buffer()
            buf1.writeCharSequence("ok", CharsetUtil.UTF_8)

            def packet = new DatagramPacket(buf1, msg.sender())

            ctx.writeAndFlush(packet).sync()
        }
    }
}
           

幾個小問題

IllegalArgumentException:IPv6 socket cannot join IPv4 multicast group

服務啟動時,可能會抛出以下異常:

java.lang.IllegalArgumentException: IPv6 socket cannot join IPv4 multicast group

  at sun.nio.ch.DatagramChannelImpl.innerJoin(DatagramChannelImpl.java:819)

  at sun.nio.ch.DatagramChannelImpl.join(DatagramChannelImpl.java:905)

  at io.netty.channel.socket.nio.NioDatagramChannel.joinGroup(NioDatagramChannel.java:414)

  at io.netty.channel.socket.nio.NioDatagramChannel.joinGroup(NioDatagramChannel.java:391)

  at io.netty.channel.socket.nio.NioDatagramChannel.joinGroup(NioDatagramChannel.java:384)

  at io.netty.channel.socket.DatagramChannel$joinGroup.call(Unknown Source)

  at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:45)

  at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:108)

  at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:120)

解決辦法是定義 ChannelFactory,指定協定簇為 IPv4

b.group(group)
	  //.channel(NioDatagramChannel.class) 
	  //使用指定 ChannelFactory
	  .channelFactory(new ChannelFactory<NioDatagramChannel>() {
	      @Override
	      NioDatagramChannel newChannel() {
	      	  // 指定協定簇為 IPv4
	          return new NioDatagramChannel(InternetProtocolFamily.IPv4)
	      }
	  })
	  ...
           

小結

使用 Netty 的 UDP 服務與 TCP 服務程式結構幾乎相同, 主要差別為:

  • 使用 NioDatagramChannel 通訊
  • 使用 DatagramPacket 封裝資料

繼續閱讀