一、基礎知識
UDP 協定相較于 TCP 協定的特點:
1、無連接配接協定,沒有持久化連接配接;
2、每個 UDP 資料報都是一個單獨的傳輸單元;
3、一定的資料報丢失;
4、沒有重傳機制,也不管資料報是否可達;
5、速度比TCP快很多,可用來高效處理大量資料 —— 犧牲了握手以及消息管理機制。
6、常用于音頻、視訊場景,可以忍受一定的資料包丢失,追求速度上的提升。
TCP 協定采用的是一種叫做單點傳播的傳輸形式,UDP 協定提供了向多個接收者發送消息的額外傳輸形式(多點傳播、廣播):
單點傳播(TCP 和 UDP):發送消息給一個由唯一的位址所辨別的單一的網絡目的地。
多點傳播(UDP):傳輸給一個預定義的主機組。
廣播(UDP):傳輸到網絡(或者子網)上的所有主機。
二、功能說明
廣播方:打開一個檔案,通過 UDP 使用特殊的受限廣播位址或者零網絡位址 255.255.255.255,把每一行作為一個消息廣播到一個指定的端口。
接收方:通過 UDP 廣播,隻需簡單地通過在指定的端口上啟動一個監聽程式,便可以建立一個事件螢幕來接收日志消息。所有的在該 UDP 端口上監聽的事件監聽器都将會接收到廣播資訊。

三、實作
下圖展示了怎麼将我們的 檔案資料 廣播為 UDP消息:所有的将要被傳輸的資料都被封裝在了 LogEvent 消息中。 LogEventBroadcaster 将把這些寫入到 Channel 中,并通過 ChannelPipeline 發送它們,在那裡它們将會被轉換(編碼)為 DatagramPacket 消息。最後,他們都将通過 UDP 被廣播,并由遠端節點(螢幕)所捕獲。
Netty 中支援 UDP 協定主要通過以下相關類:
DatagramPacket:使用 ByteBuf 作為資料源,是 UDP 協定傳輸的消息容器。
DatagramChannel:擴充了 Netty 的 Channel 抽象以支援 UDP 的多點傳播組管理,它的實作類 NioDatagramChannnel 用來和遠端節點通信。
Bootstrap:UDP 協定的引導類,使用 bind() 方法綁定 Channel。
public class LogEvent {
public static final byte SEPARATOR = ':';
/**
* IP套接字位址(IP位址+端口号)
*/
private final InetSocketAddress inetSocketAddress;
/**
* 檔案名
*/
private final String logfile;
/**
* 消息内容
*/
private final String msg;
private final long received;
/**
* 用于傳入消息的構造函數
*
* @param inetSocketAddress
* @param logfile
* @param msg
* @param received
*/
public LogEvent(InetSocketAddress inetSocketAddress, String logfile, String msg, long received) {
this.inetSocketAddress = inetSocketAddress;
this.logfile = logfile;
this.msg = msg;
this.received = received;
}
/**
* 用于傳出消息的構造函數
*
* @param logfile
* @param msg
*/
public LogEvent(String logfile, String msg) {
this(null, logfile, msg, -1);
}
public InetSocketAddress getInetSocketAddress() {
return inetSocketAddress;
}
public String getLogfile() {
return logfile;
}
public String getMsg() {
return msg;
}
public long getReceived() {
return received;
}
}
檔案實體類 LogEvent.java
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
private final InetSocketAddress remoteAddress;
public LogEventEncoder(InetSocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}
@Override
protected void encode(ChannelHandlerContext ctx, LogEvent msg, List<Object> out) throws Exception {
byte[] file = msg.getLogfile().getBytes(CharsetUtil.UTF_8);
byte[] content = msg.getMsg().getBytes(CharsetUtil.UTF_8);
ByteBuf byteBuf = ctx.alloc().buffer(file.length + content.length + 1);
byteBuf.writeBytes(file);
byteBuf.writeByte(LogEvent.SEPARATOR);
byteBuf.writeBytes(content);
out.add(new DatagramPacket(byteBuf, remoteAddress));
}
}
編碼器 LogEventEncoder.java
該編碼器實作了将 LogEvent 實體類内容轉換為 DatagramPacket UDP資料報。
public class LogEventBroadcaster {
private final EventLoopGroup group;
private final Bootstrap bootstrap;
private final File file;
public LogEventBroadcaster(InetSocketAddress address, File file) {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group)
//引導該 NioDatagramChannel(無連接配接的)
.channel(NioDatagramChannel.class)
// 設定 SO_BROADCAST 套接字選項
.option(ChannelOption.SO_BROADCAST, true)
.handler(new LogEventEncoder(address));
this.file = file;
}
public void run() throws InterruptedException, IOException {
//綁定 Channel,UDP 協定的連接配接用 bind() 方法
Channel channel = bootstrap.bind(0).sync().channel();
long pointer = 0;
//長輪詢 監聽是否有新的日志檔案生成
while (true) {
long length = file.length();
if (length < pointer) {
// 如果有必要,将檔案指針設定到該檔案的最後一個位元組
pointer = length;
} else {
RandomAccessFile raf = new RandomAccessFile(file, "r");
// 確定目前的檔案指針,以確定沒有任何的舊資料被發送
raf.seek(pointer);
String line;
while ((line = raf.readLine()) != null) {
//對于每個日志條目,寫入一個 LogEvent 到 Channel 中,最後加入一個換行符号
channel.writeAndFlush(new LogEvent(file.getAbsolutePath(), line + System.getProperty("line.separator")));
}
pointer = raf.getFilePointer();
raf.close();
}
try {
// 休眠一秒,如果被中斷,則退出循環,否則重新處理它
Thread.sleep(1000);
} catch (InterruptedException e) {
while (!Thread.interrupted()) {
break;
}
}
}
}
public void stop() {
group.shutdownGracefully();
}
public static void main(String[] args) throws IOException, InterruptedException {
InetSocketAddress socketAddress = new InetSocketAddress("255.255.255.255", 8888);
File file = new File("E:\\2018-09-12.log");
LogEventBroadcaster logEventBroadcaster = new LogEventBroadcaster(socketAddress, file);
try {
logEventBroadcaster.run();
} finally {
logEventBroadcaster.stop();
}
}
}
現在,我們來測試一下這個 UDP 廣播類,首先我們需要一個工具 nmap ,用它來監聽 UDP 的 8888 端口,以接收我們廣播的日志檔案。下載下傳位址: https://nmap.org/dist/nmap-7.70-win32.zip
下載下傳完成後,指令行進入安裝目錄,執行指令:ncat.exe -l -u -p 8888 ,監聽 UDP 端口。
當然,也可以自己寫個測試類監聽 UDP 端口,列印日志檢視。這裡我沒有用 Netty 寫監聽類,直接用了 java 原生的 DatagramSocket 和 DatagramPacket 寫的監聽類,如下:
public class UDPServer {
public static void main(String[] args) {
DatagramSocket server = null;
try {
server = new DatagramSocket(8888);
byte[] datas = new byte[1024];
//用一個位元組數組接收UDP包,位元組數組在傳遞給構造函數時是空的
while (true) {
DatagramPacket datagramPacket = new DatagramPacket(datas, datas.length);
server.receive(datagramPacket);
System.out.println(new String(datas));
}
} catch (SocketException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
server.close();
}
}
}
UDPServer.java
基于 Netty 的監聽類實作可以參考我上傳 GitHub 上的源代碼。
參考資料:《Netty IN ACTION》
示範源代碼:https://github.com/JMCuixy/NettyDemo/tree/master/src/main/java/org/netty/demo/udp