天天看點

【Netty】UDP廣播事件

一、前言

  前面學習了WebSocket協定,并且通過示例講解了WebSocket的具體使用,接着學習如何使用無連接配接的UDP來廣播事件。

二、UDP廣播事件

  2.1 UDP基礎

  面向連接配接的TCP協定管理端到端的連接配接,在連接配接生命周期中,發送的消息會有序并且可靠地進行傳輸,最後連接配接有序地終止。然而,在無連接配接協定(如UDP)中,沒有持久連接配接的概念,每個消息(UDP資料報)都是獨立的傳輸,此外,UDP沒有TCP的糾錯機制(即每個對等體會确認其接收到的分組,并且發送者會重傳未确認的分組)。

  UDP的限制比TCP多,但是比TCP快很多,這是因為消除了握手和消息管理的所有開銷,UDP非常适合處理或容忍消息丢失的應用。

  2.2 UDP廣播

  迄今為止所有的示例都使用了單點傳播的傳輸模式,其被定義為将消息發送到由唯一位址辨別的單個網絡目的地,有連接配接和無連接配接的協定都支援這種模式,UDP為多個收件人發送消息提供了額外的傳輸模式:

    · 多點傳播--傳輸到定義的主機組。

    · 廣播--傳輸到網絡(或子網)上的所有主機。

  本章中的示例将通過發送在同一網絡上的所有主機接收的消息來使用UDP廣播。

  2.3 UDP簡單示例

  示例将打開一個檔案,并通過UDP将每一行廣播為指定端口。下圖展示了應用的結構圖。

  

【Netty】UDP廣播事件

  2.4 LogEvent POJO

  在消息應用中,消息經常以POJO形式展現,LogEvent的POJO如下。  

public final class LogEvent {
    public static final byte SEPARATOR = (byte) ':';
    private final InetSocketAddress source;
    private final String logfile;
    private final String msg;
    private final long received;
    public LogEvent(String logfile, String msg) {
        this(null, -1, logfile, msg);
    }
    public LogEvent(InetSocketAddress source, long received,
        String logfile, String msg) {
        this.source = source;
        this.logfile = logfile;
        this.msg = msg;
        this.received = received;
    }
    public InetSocketAddress getSource() {
        return source;
    }
    public String getLogfile() {
        return logfile;
    }
    public String getMsg() {
        return msg;
    }
    public long getReceivedTimestamp() {
        return received;
    }
}      

  2.5 編寫broadcaster

  Netty提供了許多類來支援UDP應用程式,如Netty的DatagramPacket是DatagramChannel實作與遠端對等體進行通信的簡單消息容器,我們需要一個編碼器将EventLog消息轉換為DatagramPackets,可以擴充Netty的MessageToMessageEncoder,LogEventEncoder的代碼如下。  

public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
    private final InetSocketAddress remoteAddress;
    public LogEventEncoder(InetSocketAddress remoteAddress) {
        this.remoteAddress = remoteAddress;
    }
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext,
        LogEvent logEvent, List<Object> out) throws Exception {
        byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
        byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
        ByteBuf buf = channelHandlerContext.alloc()
            .buffer(file.length + msg.length + 1);
        buf.writeBytes(file);
        buf.writeByte(LogEvent.SEPARATOR);
        buf.writeBytes(msg);
        out.add(new DatagramPacket(buf, remoteAddress));
    }
}      

  完成編碼器後,即可以開始啟動服務端,其中服務端LogEventBroadcaster的代碼如下。  

public class LogEventBroadcaster {
    private final Bootstrap bootstrap;
    private final File file;
    private final EventLoopGroup group;

    public LogEventBroadcaster(InetSocketAddress address, File file) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST, true)
                .handler(new LogEventEncoder(address));

        this.file = file;
    }

    public void run() throws IOException {
        Channel ch = bootstrap.bind(0).syncUninterruptibly().channel();
        System.out.println("LogEventBroadcaster running");
        long pointer = 0;
        for (;;) {
            long len = file.length();
            if (len < pointer) {
                // file was reset
                pointer = len;
            } else if (len > pointer) {
                // Content was added
                RandomAccessFile raf = new RandomAccessFile(file, "r");
                raf.seek(pointer);
                String line;
                while ((line = raf.readLine()) != null) {
                    ch.writeAndFlush(new LogEvent(null, -1, file.getAbsolutePath(), line));
                }
                pointer = raf.getFilePointer();
                raf.close();
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.interrupted();
                break;
            }
        }
    }

    public void stop() {
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            throw new IllegalArgumentException();
        }

        LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress("255.255.255.255",
                Integer.parseInt(args[0])), new File(args[1]));
        try {
            broadcaster.run();
        } finally {
            broadcaster.stop();
        }
    }
}      

  2.6 編寫monitor

  在應用中

    · 接收由LogEventBroadcaster廣播的UDP DatagramPackets。

    · 将其解碼為LogEvent。

    · 将LogEvent寫入輸出流System.out。

  下圖展示LogEvent的流動。

  

【Netty】UDP廣播事件

  LogEventDecoder負責将傳入的DatagramPackets解碼為LogEvent消息,其代碼如下。  

public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {
    @Override
    protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throws Exception {
        ByteBuf data = datagramPacket.content();
        int i = data.indexOf(0, data.readableBytes(), LogEvent.SEPARATOR);
        String filename = data.slice(0, i).toString(CharsetUtil.UTF_8);
        String logMsg =  data.slice(i + 1, data.readableBytes()).toString(CharsetUtil.UTF_8);

        LogEvent event = new LogEvent(datagramPacket.recipient(), System.currentTimeMillis(),
                filename,logMsg);
        out.add(event);
    }
}      

  而LogEventHandler用于處理LogEvent,其代碼如下。  

public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelRead0(ChannelHandlerContext channelHandlerContext, LogEvent event) throws Exception {
        StringBuilder builder = new StringBuilder();
        builder.append(event.getReceivedTimestamp());
        builder.append(" [");
        builder.append(event.getSource().toString());
        builder.append("] [");
        builder.append(event.getLogfile());
        builder.append("] : ");
        builder.append(event.getMsg());

        System.out.println(builder.toString());
    }
}      

  LogEventMonitor用于将處理器添加至管道中,其代碼如下。  

public class LogEventMonitor {

    private final Bootstrap bootstrap;
    private final EventLoopGroup group;
    public LogEventMonitor(InetSocketAddress address) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST, true)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        pipeline.addLast(new LogEventDecoder());
                        pipeline.addLast(new LogEventHandler());
                    }
                }).localAddress(address);

    }

    public Channel bind() {
        return bootstrap.bind().syncUninterruptibly().channel();
    }

    public void stop() {
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            throw new IllegalArgumentException("Usage: LogEventMonitor <port>");
        }
        LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(Integer.parseInt(args[0])));
        try {
            Channel channel = monitor.bind();
            System.out.println("LogEventMonitor running");

            channel.closeFuture().await();
        } finally {
            monitor.stop();
        }
    }
}      

  運作LogEventBroadcaster和LogEventMonitor

三、總結

  本篇博文講解了UDP協定,以及其示例,在實際應用中需要根據不同的應用場景選擇不同的協定,謝謝各位園友的觀看~

繼續閱讀