一:簡介
Netty是由JBOSS提供的一個java開源架構。Netty提供異步的、事件驅動的網絡應用程式架構和工具,用以快速開發高性能、高可靠性的網絡伺服器和用戶端程式。也就是說,Netty 是一個基于NIO的客戶、伺服器端程式設計架構,使用Netty 可以確定你快速和簡單的開發出一個網絡應用,例如實作了某種協定的客戶,服務端應用。Netty相當簡化和流線化了網絡應用的程式設計開發過程,例如,TCP和UDP的socket服務開發。
“快速”和“簡單”并不用産生維護性或性能上的問題。Netty 是一個吸收了多種協定的實作經驗,這些協定包括FTP,SMTP,HTTP,各種二進制,文本協定,并經過相當精心設計的項目,最終,Netty 成功的找到了一種方式,在保證易于開發的同時還保證了其應用的性能,穩定性和伸縮性。
Netty從4.x版本開始,需要使用JDK1.6及以上版本提供基礎支撐。
在設計上:針對多種傳輸類型的統一接口 - 阻塞和非阻塞;簡單但更強大的線程模型;真正的無連接配接的資料報套接字支援;連結邏輯支援複用。
在性能上:比核心 Java API 更好的吞吐量,較低的延時;資源消耗更少,這個得益于共享池和重用;減少記憶體拷貝。
在健壯性上:消除由于慢,快,或重載連接配接産生的 OutOfMemoryError;消除經常發現在 NIO 在高速網絡中的應用中的不公平的讀/寫比。
在安全上:完整的 SSL / TLS 和 StartTLS 的支援,且已得到大量商業應用的真實驗證,如:Hadoop項目的Avro(RPC架構)、Dubbo、Dubbox等RPC架構。
Netty的官網是:
http://netty.io
有三方提供的中文翻譯Netty使用者手冊(官網提供源資訊):
http://ifeve.com/netty5-user-guide/
二:Netty架構
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHLw0ERONzaq1kMNpHW4Z0MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL5ITO2QDMxYTM0ITMxkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
三:線程模型
1:單線程模型
在ServerBootstrap調用方法group的時候,傳遞的參數是同一個線程組,且在構造線程組的時候,構造參數為1,這種開發方式,就是一個單線程模型。個人機開發測試使用。不推薦。
2:多線程模型
在ServerBootstrap調用方法group的時候,傳遞的參數是兩個不同的線程組。負責監聽的acceptor線程組,線程數為1,也就是構造參數為1。負責處理用戶端任務的線程組,線程數大于1,也就是構造參數大于1。這種開發方式,就是多線程模型。長連接配接,且用戶端數量較少,連接配接持續時間較長情況下使用。如:企業内部交流應用。
3:主從多線程模型
在ServerBootstrap調用方法group的時候,傳遞的參數是兩個不同的線程組。負責監聽的acceptor線程組,線程數大于1,也就是構造參數大于1。負責處理用戶端任務的線程組,線程數大于1,也就是構造參數大于1。這種開發方式,就是主從多線程模型。長連接配接,用戶端數量相對較多,連接配接持續時間比較長的情況下使用。如:對外提供服務的相冊伺服器。
四:基礎程式
1:入門
/**
* 1. 雙線程組
* 2. Bootstrap配置啟動資訊
* 3. 注冊業務處理Handler
* 4. 綁定服務監聽端口并啟動服務
*/
package com.hhxy.netty.first;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Server4HelloWorld {
// 監聽線程組,監聽用戶端請求
private EventLoopGroup acceptorGroup = null;
// 處理用戶端相關操作線程組,負責處理與用戶端的資料通訊
private EventLoopGroup clientGroup = null;
// 服務啟動相關配置資訊
private ServerBootstrap bootstrap = null;
public Server4HelloWorld(){
init();
}
private void init(){
// 初始化線程組,建構線程組的時候,如果不傳遞參數,則預設建構的線程組線程數是CPU核心數量。
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
// 初始化服務的配置
bootstrap = new ServerBootstrap();
// 綁定線程組
bootstrap.group(acceptorGroup, clientGroup);
// 設定通訊模式為NIO, 同步非阻塞
bootstrap.channel(NioServerSocketChannel.class);
// 設定緩沖區大小, 緩存區的機關是位元組。
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF發送緩沖區,SO_RCVBUF接收緩沖區,SO_KEEPALIVE開啟心跳監測(保證連接配接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
.option(ChannelOption.SO_RCVBUF, 16*1024)
.option(ChannelOption.SO_KEEPALIVE, true);
}
/**
* 監聽處理邏輯。
* @param port 監聽端口。
* @param acceptorHandlers 處理器, 如何處理用戶端請求。
* @return
* @throws InterruptedException
*/
public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException{
/*
* childHandler是服務的Bootstrap獨有的方法。是用于提供處理對象的。
* 可以一次性增加若幹個處理邏輯。是類似責任鍊模式的處理方式。
* 增加A,B兩個處理邏輯,在處理用戶端請求資料的時候,根據A-》B順序依次處理。
*
* ChannelInitializer - 用于提供處理器的一個模型對象。
* 其中定義了一個方法,initChannel方法。
* 方法是用于初始化處理邏輯責任鍊條的。
* 可以保證服務端的Bootstrap隻初始化一次處理器,盡量提供處理邏輯的重用。
* 避免反複的建立處理器對象。節約資源開銷。
*/
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(acceptorHandlers);
}
});
// bind方法 - 綁定監聽端口的。ServerBootstrap可以綁定多個監聽端口。 多次調用bind方法即可
// sync - 開始監聽邏輯。 傳回一個ChannelFuture。 傳回結果代表的是監聽成功後的一個對應的未來結果
// 可以使用ChannelFuture實作後續的伺服器和用戶端的互動。
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}
/**
* shutdownGracefully - 方法是一個安全關閉的方法。可以保證不放棄任何一個已接收的用戶端請求。
*/
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}
public static void main(String[] args){
ChannelFuture future = null;
Server4HelloWorld server = null;
try{
server = new Server4HelloWorld();
future = server.doAccept(9999,new Server4HelloWorldHandler());
System.out.println("server started.");
// 關閉連接配接的。
future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != server){
server.release();
}
}
}
}
/**
* @Sharable注解 -
* 代表目前Handler是一個可以分享的處理器。也就意味着,伺服器注冊此Handler後,可以分享給多個用戶端同時使用。
* 如果不使用注解描述類型,則每次用戶端請求時,必須為用戶端重新建立一個新的Handler對象。
* 如果handler是一個Sharable的,一定避免定義可寫的執行個體變量。
* bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new XxxHandler());
}
});
*/
package com.hhxy.netty.first;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
@Sharable
public class Server4HelloWorldHandler extends ChannelHandlerAdapter {
/**
* 業務處理邏輯
* 用于處理讀取資料請求的邏輯。
* ctx - 上下文對象。其中包含于用戶端建立連接配接的所有資源。 如: 對應的Channel
* msg - 讀取到的資料。 預設類型是ByteBuf,是Netty自定義的。是對ByteBuffer的封裝。 不需要考慮複位問題。
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 擷取讀取的資料, 是一個緩沖。
ByteBuf readBuffer = (ByteBuf) msg;
// 建立一個位元組數組,用于儲存緩存中的資料。
byte[] tempDatas = new byte[readBuffer.readableBytes()];
// 将緩存中的資料讀取到位元組數組中。
readBuffer.readBytes(tempDatas);
String message = new String(tempDatas, "UTF-8");
System.out.println("from client : " + message);
if("exit".equals(message)){
ctx.close();
return;
}
String line = "server message to client!";
// 寫操作自動釋放緩存,避免記憶體溢出問題。
ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
// 注意,如果調用的是write方法。不會重新整理緩存,緩存中的資料不會發送到用戶端,必須再次調用flush方法才行。
// ctx.write(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
// ctx.flush();
}
/**
* 異常處理邏輯, 當用戶端異常退出的時候,也會運作。
* ChannelHandlerContext關閉,也代表目前與用戶端連接配接的資源關閉。
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
}
/**
* 1. 單線程組
* 2. Bootstrap配置啟動資訊
* 3. 注冊業務處理Handler
* 4. connect連接配接服務,并發起請求
*/
package com.hhxy.netty.first;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* 因為用戶端是請求的發起者,不需要監聽。
* 隻需要定義唯一的一個線程組即可。
*/
public class Client4HelloWorld {
// 處理請求和處理服務端響應的線程組
private EventLoopGroup group = null;
// 用戶端啟動相關配置資訊
private Bootstrap bootstrap = null;
public Client4HelloWorld(){
init();
}
private void init(){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 綁定線程組
bootstrap.group(group);
// 設定通訊模式為NIO
bootstrap.channel(NioSocketChannel.class);
}
public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers) throws InterruptedException{
/*
* 用戶端的Bootstrap沒有childHandler方法。隻有handler方法。
* 方法含義等同ServerBootstrap中的childHandler
* 在用戶端必須綁定處理器,也就是必須調用handler方法。
* 伺服器必須綁定處理器,必須調用childHandler方法。
*/
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(handlers);
}
});
// 建立連接配接。
ChannelFuture future = this.bootstrap.connect(host, port).sync();
return future;
}
public void release(){
this.group.shutdownGracefully();
}
public static void main(String[] args) {
Client4HelloWorld client = null;
ChannelFuture future = null;
try{
client = new Client4HelloWorld();
future = client.doRequest("localhost", 9999, new Client4HelloWorldHandler());
Scanner s = null;
while(true){
s = new Scanner(System.in);
System.out.print("enter message send to server (enter 'exit' for close client) > ");
String line = s.nextLine();
if("exit".equals(line)){
// addListener - 增加監聽,當某條件滿足的時候,觸發監聽器。
// ChannelFutureListener.CLOSE - 關閉監聽器,代表ChannelFuture執行傳回後,關閉連接配接。
future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")))
.addListener(ChannelFutureListener.CLOSE);
break;
}
future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
TimeUnit.SECONDS.sleep(1);
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}
}
package com.hhxy.netty.first;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public class Client4HelloWorldHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
ByteBuf readBuffer = (ByteBuf) msg;
byte[] tempDatas = new byte[readBuffer.readableBytes()];
readBuffer.readBytes(tempDatas);
System.out.println("from server : " + new String(tempDatas, "UTF-8"));
}finally{
// 用于釋放緩存。避免記憶體溢出
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
/*@Override // 斷開連接配接時執行
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive method run...");
}
@Override // 連接配接通道建立成功時執行
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive method run...");
}
@Override // 每次讀取完成時執行
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete method run...");
}*/
}
2:拆包粘包問題解決
netty使用tcp/ip協定傳輸資料。而tcp/ip協定是類似水流一樣的資料傳輸方式。多次通路的時候有可能出現資料粘包的問題,解決這種問題的方式如下:
2.1:定長資料流
用戶端和伺服器,提前協調好,每個消息長度固定。(如:長度10)。如果用戶端或伺服器寫出的資料不足10,則使用空白字元補足(如:使用空格)。
/**
* 1. 雙線程組
* 2. Bootstrap配置啟動資訊
* 3. 注冊業務處理Handler
* 4. 綁定服務監聽端口并啟動服務
*/
package com.hhxy.netty.fixedlength;
import java.nio.charset.Charset;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class Server4FixedLength {
// 監聽線程組,監聽用戶端請求
private EventLoopGroup acceptorGroup = null;
// 處理用戶端相關操作線程組,負責處理與用戶端的資料通訊
private EventLoopGroup clientGroup = null;
// 服務啟動相關配置資訊
private ServerBootstrap bootstrap = null;
public Server4FixedLength(){
init();
}
private void init(){
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
// 綁定線程組
bootstrap.group(acceptorGroup, clientGroup);
// 設定通訊模式為NIO
bootstrap.channel(NioServerSocketChannel.class);
// 設定緩沖區大小
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF發送緩沖區,SO_RCVBUF接收緩沖區,SO_KEEPALIVE開啟心跳監測(保證連接配接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
.option(ChannelOption.SO_RCVBUF, 16*1024)
.option(ChannelOption.SO_KEEPALIVE, true);
}
public ChannelFuture doAccept(int port) throws InterruptedException{
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelHandler[] acceptorHandlers = new ChannelHandler[3];
// 定長Handler。通過構造參數設定消息長度(機關是位元組)。發送的消息長度不足可以使用空格補全。
acceptorHandlers[0] = new FixedLengthFrameDecoder(5);
// 字元串解碼器Handler,會自動處理channelRead方法的msg參數,将ByteBuf類型的資料轉換為字元串對象
acceptorHandlers[1] = new StringDecoder(Charset.forName("UTF-8"));
acceptorHandlers[2] = new Server4FixedLengthHandler();
ch.pipeline().addLast(acceptorHandlers);
}
});
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}
public static void main(String[] args){
ChannelFuture future = null;
Server4FixedLength server = null;
try{
server = new Server4FixedLength();
future = server.doAccept(9999);
System.out.println("server started.");
future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != server){
server.release();
}
}
}
}
package com.hhxy.netty.fixedlength;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class Server4FixedLengthHandler extends ChannelHandlerAdapter {
// 業務處理邏輯
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = msg.toString();
System.out.println("from client : " + message.trim());
String line = "ok ";
ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
}
// 異常處理邏輯
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
}
/**
* 1. 單線程組
* 2. Bootstrap配置啟動資訊
* 3. 注冊業務處理Handler
* 4. connect連接配接服務,并發起請求
*/
package com.hhxy.netty.fixedlength;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class Client4FixedLength {
// 處理請求和處理服務端響應的線程組
private EventLoopGroup group = null;
// 服務啟動相關配置資訊
private Bootstrap bootstrap = null;
public Client4FixedLength(){
init();
}
private void init(){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 綁定線程組
bootstrap.group(group);
// 設定通訊模式為NIO
bootstrap.channel(NioSocketChannel.class);
}
public ChannelFuture doRequest(String host, int port) throws InterruptedException{
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelHandler[] handlers = new ChannelHandler[3];
handlers[0] = new FixedLengthFrameDecoder(3);
// 字元串解碼器Handler,會自動處理channelRead方法的msg參數,将ByteBuf類型的資料轉換為字元串對象
handlers[1] = new StringDecoder(Charset.forName("UTF-8"));
handlers[2] = new Client4FixedLengthHandler();
ch.pipeline().addLast(handlers);
}
});
ChannelFuture future = this.bootstrap.connect(host, port).sync();
return future;
}
public void release(){
this.group.shutdownGracefully();
}
public static void main(String[] args) {
Client4FixedLength client = null;
ChannelFuture future = null;
try{
client = new Client4FixedLength();
future = client.doRequest("localhost", 9999);
Scanner s = null;
while(true){
s = new Scanner(System.in);
System.out.print("enter message send to server > ");
String line = s.nextLine();
byte[] bs = new byte[5];
byte[] temp = line.getBytes("UTF-8");
if(temp.length <= 5){
for(int i = 0; i < temp.length; i++){
bs[i] = temp[i];
}
}
future.channel().writeAndFlush(Unpooled.copiedBuffer(bs));
TimeUnit.SECONDS.sleep(1);
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}
}
package com.hhxy.netty.fixedlength;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public class Client4FixedLengthHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
String message = msg.toString();
System.out.println("from server : " + message);
}finally{
// 用于釋放緩存。避免記憶體溢出
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
}
2.2:特殊結束符
用戶端和伺服器,協商定義一個特殊的分隔符号,分隔符号長度自定義。如:‘#’、‘$_$’、‘AA@’。在通訊的時候,隻要沒有發送分隔符号,則代表一條資料沒有結束。
/**
* 1. 雙線程組
* 2. Bootstrap配置啟動資訊
* 3. 注冊業務處理Handler
* 4. 綁定服務監聽端口并啟動服務
*/
package com.hhxy.netty.delimiter;
import java.nio.charset.Charset;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class Server4Delimiter {
// 監聽線程組,監聽用戶端請求
private EventLoopGroup acceptorGroup = null;
// 處理用戶端相關操作線程組,負責處理與用戶端的資料通訊
private EventLoopGroup clientGroup = null;
// 服務啟動相關配置資訊
private ServerBootstrap bootstrap = null;
public Server4Delimiter(){
init();
}
private void init(){
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
// 綁定線程組
bootstrap.group(acceptorGroup, clientGroup);
// 設定通訊模式為NIO
bootstrap.channel(NioServerSocketChannel.class);
// 設定緩沖區大小
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF發送緩沖區,SO_RCVBUF接收緩沖區,SO_KEEPALIVE開啟心跳監測(保證連接配接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
.option(ChannelOption.SO_RCVBUF, 16*1024)
.option(ChannelOption.SO_KEEPALIVE, true);
}
public ChannelFuture doAccept(int port) throws InterruptedException{
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 資料分隔符, 定義的資料分隔符一定是一個ByteBuf類型的資料對象。
ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());
ChannelHandler[] acceptorHandlers = new ChannelHandler[3];
// 處理固定結束标記符号的Handler。這個Handler沒有@Sharable注解修飾,
// 必須每次初始化通道時建立一個新對象
// 使用特殊符号分隔處理資料粘包問題,也要定義每個資料包最大長度。netty建議資料有最大長度。
acceptorHandlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter);
// 字元串解碼器Handler,會自動處理channelRead方法的msg參數,将ByteBuf類型的資料轉換為字元串對象
acceptorHandlers[1] = new StringDecoder(Charset.forName("UTF-8"));
acceptorHandlers[2] = new Server4DelimiterHandler();
ch.pipeline().addLast(acceptorHandlers);
}
});
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}
public static void main(String[] args){
ChannelFuture future = null;
Server4Delimiter server = null;
try{
server = new Server4Delimiter();
future = server.doAccept(9999);
System.out.println("server started.");
future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != server){
server.release();
}
}
}
}
package com.hhxy.netty.delimiter;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class Server4DelimiterHandler extends ChannelHandlerAdapter {
// 業務處理邏輯
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = msg.toString();
System.out.println("from client : " + message);
String line = "server message $E$ test delimiter handler!! $E$ second message $E$";
ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
}
// 異常處理邏輯
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
}
/**
* 1. 單線程組
* 2. Bootstrap配置啟動資訊
* 3. 注冊業務處理Handler
* 4. connect連接配接服務,并發起請求
*/
package com.hhxy.netty.delimiter;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class Client4Delimiter {
// 處理請求和處理服務端響應的線程組
private EventLoopGroup group = null;
// 服務啟動相關配置資訊
private Bootstrap bootstrap = null;
public Client4Delimiter(){
init();
}
private void init(){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 綁定線程組
bootstrap.group(group);
// 設定通訊模式為NIO
bootstrap.channel(NioSocketChannel.class);
}
public ChannelFuture doRequest(String host, int port) throws InterruptedException{
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 資料分隔符
ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());
ChannelHandler[] handlers = new ChannelHandler[3];
handlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter);
// 字元串解碼器Handler,會自動處理channelRead方法的msg參數,将ByteBuf類型的資料轉換為字元串對象
handlers[1] = new StringDecoder(Charset.forName("UTF-8"));
handlers[2] = new Client4DelimiterHandler();
ch.pipeline().addLast(handlers);
}
});
ChannelFuture future = this.bootstrap.connect(host, port).sync();
return future;
}
public void release(){
this.group.shutdownGracefully();
}
public static void main(String[] args) {
Client4Delimiter client = null;
ChannelFuture future = null;
try{
client = new Client4Delimiter();
future = client.doRequest("localhost", 9999);
Scanner s = null;
while(true){
s = new Scanner(System.in);
System.out.print("enter message send to server > ");
String line = s.nextLine();
future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
TimeUnit.SECONDS.sleep(1);
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}
}
package com.hhxy.netty.delimiter;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public class Client4DelimiterHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
String message = msg.toString();
System.out.println("from server : " + message);
}finally{
// 用于釋放緩存。避免記憶體溢出
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
}
2.3:協定
相對最成熟的資料傳遞方式。有伺服器的開發者提供一個固定格式的協定标準。用戶端和伺服器發送資料和接受資料的時候,都依據協定制定和解析消息。
協定格式:
HEADcontent-length:xxxxHEADBODYxxxxxxBODY
/**
* 1. 雙線程組
* 2. Bootstrap配置啟動資訊
* 3. 注冊業務處理Handler
* 4. 綁定服務監聽端口并啟動服務
*/
package com.hhxy.netty.protocol;
import java.nio.charset.Charset;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class Server4Protocol {
// 監聽線程組,監聽用戶端請求
private EventLoopGroup acceptorGroup = null;
// 處理用戶端相關操作線程組,負責處理與用戶端的資料通訊
private EventLoopGroup clientGroup = null;
// 服務啟動相關配置資訊
private ServerBootstrap bootstrap = null;
public Server4Protocol(){
init();
}
private void init(){
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
// 綁定線程組
bootstrap.group(acceptorGroup, clientGroup);
// 設定通訊模式為NIO
bootstrap.channel(NioServerSocketChannel.class);
// 設定緩沖區大小
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF發送緩沖區,SO_RCVBUF接收緩沖區,SO_KEEPALIVE開啟心跳監測(保證連接配接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
.option(ChannelOption.SO_RCVBUF, 16*1024)
.option(ChannelOption.SO_KEEPALIVE, true);
}
public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException{
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(acceptorHandlers);
}
});
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}
public static void main(String[] args){
ChannelFuture future = null;
Server4Protocol server = null;
try{
server = new Server4Protocol();
future = server.doAccept(9999,new Server4ProtocolHandler());
System.out.println("server started.");
future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != server){
server.release();
}
}
}
}
/**
* @Sharable注解 -
* 代表目前Handler是一個可以分享的處理器。也就意味着,伺服器注冊此Handler後,可以分享給多個用戶端同時使用。
* 如果不使用注解描述類型,則每次用戶端請求時,必須為用戶端重新建立一個新的Handler對象。
*
*/
package com.hhxy.netty.protocol;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
@Sharable
public class Server4ProtocolHandler extends ChannelHandlerAdapter {
// 業務處理邏輯
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = msg.toString();
System.out.println("server receive protocol content : " + message);
message = ProtocolParser.parse(message);
if(null == message){
System.out.println("error request from client");
return ;
}
System.out.println("from client : " + message);
String line = "server message";
line = ProtocolParser.transferTo(line);
System.out.println("server send protocol content : " + line);
ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
}
// 異常處理邏輯
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
cause.printStackTrace();
ctx.close();
}
static class ProtocolParser{
public static String parse(String message){
String[] temp = message.split("HEADBODY");
temp[0] = temp[0].substring(4);
temp[1] = temp[1].substring(0, (temp[1].length()-4));
int length = Integer.parseInt(temp[0].substring(temp[0].indexOf(":")+1));
if(length != temp[1].length()){
return null;
}
return temp[1];
}
public static String transferTo(String message){
message = "HEADcontent-length:" + message.length() + "HEADBODY" + message + "BODY";
return message;
}
}
}
/**
* 1. 單線程組
* 2. Bootstrap配置啟動資訊
* 3. 注冊業務處理Handler
* 4. connect連接配接服務,并發起請求
*/
package com.hhxy.netty.protocol;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class Client4Protocol {
// 處理請求和處理服務端響應的線程組
private EventLoopGroup group = null;
// 服務啟動相關配置資訊
private Bootstrap bootstrap = null;
public Client4Protocol(){
init();
}
private void init(){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 綁定線程組
bootstrap.group(group);
// 設定通訊模式為NIO
bootstrap.channel(NioSocketChannel.class);
}
public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers) throws InterruptedException{
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
ch.pipeline().addLast(handlers);
}
});
ChannelFuture future = this.bootstrap.connect(host, port).sync();
return future;
}
public void release(){
this.group.shutdownGracefully();
}
public static void main(String[] args) {
Client4Protocol client = null;
ChannelFuture future = null;
try{
client = new Client4Protocol();
future = client.doRequest("localhost", 9999, new Client4ProtocolHandler());
Scanner s = null;
while(true){
s = new Scanner(System.in);
System.out.print("enter message send to server > ");
String line = s.nextLine();
line = Client4ProtocolHandler.ProtocolParser.transferTo(line);
System.out.println("client send protocol content : " + line);
future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
TimeUnit.SECONDS.sleep(1);
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}
}
package com.hhxy.netty.protocol;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public class Client4ProtocolHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
String message = msg.toString();
System.out.println("client receive protocol content : " + message);
message = ProtocolParser.parse(message);
if(null == message){
System.out.println("error response from server");
return ;
}
System.out.println("from server : " + message);
}finally{
// 用于釋放緩存。避免記憶體溢出
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
static class ProtocolParser{
public static String parse(String message){
String[] temp = message.split("HEADBODY");
temp[0] = temp[0].substring(4);
temp[1] = temp[1].substring(0, (temp[1].length()-4));
int length = Integer.parseInt(temp[0].substring(temp[0].indexOf(":")+1));
if(length != temp[1].length()){
return null;
}
return temp[1];
}
public static String transferTo(String message){
message = "HEADcontent-length:" + message.length() + "HEADBODY" + message + "BODY";
return message;
}
}
}
3:序列化對象
JBoss Marshalling序列化:Java是面向對象的開發語言。傳遞的資料如果是Java對象,應該是最友善且可靠。
/**
* 1. 雙線程組
* 2. Bootstrap配置啟動資訊
* 3. 注冊業務處理Handler
* 4. 綁定服務監聽端口并啟動服務
*/
package com.hhxy.netty.serialized;
import com.hhxy.utils.SerializableFactory4Marshalling;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Server4Serializable {
// 監聽線程組,監聽用戶端請求
private EventLoopGroup acceptorGroup = null;
// 處理用戶端相關操作線程組,負責處理與用戶端的資料通訊
private EventLoopGroup clientGroup = null;
// 服務啟動相關配置資訊
private ServerBootstrap bootstrap = null;
public Server4Serializable(){
init();
}
private void init(){
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
// 綁定線程組
bootstrap.group(acceptorGroup, clientGroup);
// 設定通訊模式為NIO
bootstrap.channel(NioServerSocketChannel.class);
// 設定緩沖區大小
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF發送緩沖區,SO_RCVBUF接收緩沖區,SO_KEEPALIVE開啟心跳監測(保證連接配接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
.option(ChannelOption.SO_RCVBUF, 16*1024)
.option(ChannelOption.SO_KEEPALIVE, true);
}
public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException{
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
ch.pipeline().addLast(acceptorHandlers);
}
});
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}
public static void main(String[] args){
ChannelFuture future = null;
Server4Serializable server = null;
try{
server = new Server4Serializable();
future = server.doAccept(9999,new Server4SerializableHandler());
System.out.println("server started.");
future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != server){
server.release();
}
}
}
}
/**
* @Sharable注解 -
* 代表目前Handler是一個可以分享的處理器。也就意味着,伺服器注冊此Handler後,可以分享給多個用戶端同時使用。
* 如果不使用注解描述類型,則每次用戶端請求時,必須為用戶端重新建立一個新的Handler對象。
*
*/
package com.hhxy.netty.serialized;
import io.netty.channel.ChannelHandler.Sharable;
import com.hhxy.utils.GzipUtils;
import com.hhxy.utils.RequestMessage;
import com.hhxy.utils.ResponseMessage;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
@Sharable
public class Server4SerializableHandler extends ChannelHandlerAdapter {
// 業務處理邏輯
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("from client : ClassName - " + msg.getClass().getName()
+ " ; message : " + msg.toString());
if(msg instanceof RequestMessage){
RequestMessage request = (RequestMessage)msg;
byte[] attachment = GzipUtils.unzip(request.getAttachment());
System.out.println(new String(attachment));
}
ResponseMessage response = new ResponseMessage(0L, "test response");
ctx.writeAndFlush(response);
}
// 異常處理邏輯
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
cause.printStackTrace();
ctx.close();
}
}
/**
* 1. 單線程組
* 2. Bootstrap配置啟動資訊
* 3. 注冊業務處理Handler
* 4. connect連接配接服務,并發起請求
*/
package com.hhxy.netty.serialized;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import com.hhxy.utils.GzipUtils;
import com.hhxy.utils.RequestMessage;
import com.hhxy.utils.SerializableFactory4Marshalling;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Client4Serializable {
// 處理請求和處理服務端響應的線程組
private EventLoopGroup group = null;
// 服務啟動相關配置資訊
private Bootstrap bootstrap = null;
public Client4Serializable(){
init();
}
private void init(){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 綁定線程組
bootstrap.group(group);
// 設定通訊模式為NIO
bootstrap.channel(NioSocketChannel.class);
}
public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers) throws InterruptedException{
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
ch.pipeline().addLast(handlers);
}
});
ChannelFuture future = this.bootstrap.connect(host, port).sync();
return future;
}
public void release(){
this.group.shutdownGracefully();
}
public static void main(String[] args) {
Client4Serializable client = null;
ChannelFuture future = null;
try{
client = new Client4Serializable();
future = client.doRequest("localhost", 9999, new Client4SerializableHandler());
String attachment = "test attachment";
byte[] attBuf = attachment.getBytes();
attBuf = GzipUtils.zip(attBuf);
RequestMessage msg = new RequestMessage(new Random().nextLong(),
"test", attBuf);
future.channel().writeAndFlush(msg);
TimeUnit.SECONDS.sleep(1);
future.addListener(ChannelFutureListener.CLOSE);
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}
}
package com.hhxy.netty.serialized;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class Client4SerializableHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("from server : ClassName - " + msg.getClass().getName()
+ " ; message : " + msg.toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
cause.printStackTrace();
ctx.close();
}
}
4:定時斷線重連
用戶端斷線重連機制。
用戶端數量多,且需要傳遞的資料量級較大。可以周期性的發送資料的時候,使用。要求對資料的即時性不高的時候,才可使用。
優點: 可以使用資料緩存。不是每條資料進行一次資料互動。可以定時回收資源,對資源使用率高。相對來說,即時性可以通過其他方式保證。如: 120秒自動斷線。資料變化1000次請求伺服器一次。300秒中自動發送不足1000次的變化資料。
/**
* 1. 雙線程組
* 2. Bootstrap配置啟動資訊
* 3. 注冊業務處理Handler
* 4. 綁定服務監聽端口并啟動服務
*/
package com.hhxy.netty.timer;
import com.hhxy.utils.SerializableFactory4Marshalling;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
public class Server4Timer {
// 監聽線程組,監聽用戶端請求
private EventLoopGroup acceptorGroup = null;
// 處理用戶端相關操作線程組,負責處理與用戶端的資料通訊
private EventLoopGroup clientGroup = null;
// 服務啟動相關配置資訊
private ServerBootstrap bootstrap = null;
public Server4Timer(){
init();
}
private void init(){
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
// 綁定線程組
bootstrap.group(acceptorGroup, clientGroup);
// 設定通訊模式為NIO
bootstrap.channel(NioServerSocketChannel.class);
// 設定緩沖區大小
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF發送緩沖區,SO_RCVBUF接收緩沖區,SO_KEEPALIVE開啟心跳監測(保證連接配接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
.option(ChannelOption.SO_RCVBUF, 16*1024)
.option(ChannelOption.SO_KEEPALIVE, true);
// 增加日志Handler,日志級别為info
// bootstrap.handler(new LoggingHandler(LogLevel.INFO));
}
public ChannelFuture doAccept(int port) throws InterruptedException{
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
// 定義一個定時斷線處理器,當多長時間内,沒有任何的可讀取資料,自動斷開連接配接。
// 構造參數,就是間隔時長。 預設的機關是秒。
// 自定義間隔時長機關。 new ReadTimeoutHandler(long times, TimeUnit unit);
ch.pipeline().addLast(new ReadTimeoutHandler(3));
ch.pipeline().addLast(new Server4TimerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}
public static void main(String[] args){
ChannelFuture future = null;
Server4Timer server = null;
try{
server = new Server4Timer();
future = server.doAccept(9999);
System.out.println("server started.");
future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != server){
server.release();
}
}
}
}
/**
* @Sharable注解 -
* 代表目前Handler是一個可以分享的處理器。也就意味着,伺服器注冊此Handler後,可以分享給多個用戶端同時使用。
* 如果不使用注解描述類型,則每次用戶端請求時,必須為用戶端重新建立一個新的Handler對象。
*
*/
package com.hhxy.netty.timer;
import io.netty.channel.ChannelHandler.Sharable;
import com.hhxy.utils.GzipUtils;
import com.hhxy.utils.RequestMessage;
import com.hhxy.utils.ResponseMessage;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
@Sharable
public class Server4TimerHandler extends ChannelHandlerAdapter {
// 業務處理邏輯
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("from client : ClassName - " + msg.getClass().getName()
+ " ; message : " + msg.toString());
ResponseMessage response = new ResponseMessage(0L, "test response");
ctx.writeAndFlush(response);
}
// 異常處理邏輯
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
}
/**
* 1. 單線程組
* 2. Bootstrap配置啟動資訊
* 3. 注冊業務處理Handler
* 4. connect連接配接服務,并發起請求
*/
package com.hhxy.netty.timer;
import java.hhxy.Random;
import java.hhxy.concurrent.TimeUnit;
import com.hhxy.utils.RequestMessage;
import com.hhxy.utils.SerializableFactory4Marshalling;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.WriteTimeoutHandler;
public class Client4Timer {
// 處理請求和處理服務端響應的線程組
private EventLoopGroup group = null;
// 服務啟動相關配置資訊
private Bootstrap bootstrap = null;
private ChannelFuture future = null;
public Client4Timer(){
init();
}
private void init(){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 綁定線程組
bootstrap.group(group);
// 設定通訊模式為NIO
bootstrap.channel(NioSocketChannel.class);
// bootstrap.handler(new LoggingHandler(LogLevel.INFO));
}
public void setHandlers() throws InterruptedException{
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
// 寫操作自定斷線。 在指定時間内,沒有寫操作,自動斷線。
ch.pipeline().addLast(new WriteTimeoutHandler(3));
ch.pipeline().addLast(new Client4TimerHandler());
}
});
}
public ChannelFuture getChannelFuture(String host, int port) throws InterruptedException{
if(future == null){
future = this.bootstrap.connect(host, port).sync();
}
if(!future.channel().isActive()){
future = this.bootstrap.connect(host, port).sync();
}
return future;
}
public void release(){
this.group.shutdownGracefully();
}
public static void main(String[] args) {
Client4Timer client = null;
ChannelFuture future = null;
try{
client = new Client4Timer();
client.setHandlers();
future = client.getChannelFuture("localhost", 9999);
for(int i = 0; i < 3; i++){
RequestMessage msg = new RequestMessage(new Random().nextLong(),
"test"+i, new byte[0]);
future.channel().writeAndFlush(msg);
TimeUnit.SECONDS.sleep(2);
}
TimeUnit.SECONDS.sleep(5);
future = client.getChannelFuture("localhost", 9999);
RequestMessage msg = new RequestMessage(new Random().nextLong(),
"test", new byte[0]);
future.channel().writeAndFlush(msg);
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}
}
package com.hhxy.netty.timer;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class Client4TimerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("from server : ClassName - " + msg.getClass().getName()
+ " ; message : " + msg.toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
cause.printStackTrace();
ctx.close();
}
/**
* 當連接配接建立成功後,出發的代碼邏輯。
* 在一次連接配接中隻運作唯一一次。
* 通常用于實作連接配接确認和資源初始化的。
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client channel active");
}
}
5:心跳監測
使用定時發送消息的方式,實作硬體檢測,達到心态檢測的目的。
心跳監測是用于檢測電腦硬體和軟體資訊的一種技術。如:CPU使用率,磁盤使用率,記憶體使用率,程序情況,線程情況等。
5.1:sigar
需要下載下傳一個zip壓縮包。内部包含若幹sigar需要的作業系統檔案。sigar插件是通過JVM通路作業系統,讀取計算機硬體的一個插件庫。讀取計算機硬體過程中,必須由作業系統提供硬體資訊。硬體資訊是通過作業系統提供的。zip壓縮包中是sigar編寫的作業系統檔案,如:windows中的動态連結庫檔案。
解壓需要的作業系統檔案,将作業系統檔案指派到${Java_home}/bin目錄中。
/**
* 1. 雙線程組
* 2. Bootstrap配置啟動資訊
* 3. 注冊業務處理Handler
* 4. 綁定服務監聽端口并啟動服務
*/
package com.hhxy.netty.heatbeat;
import com.hhxy.utils.SerializableFactory4Marshalling;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Server4Heatbeat {
// 監聽線程組,監聽用戶端請求
private EventLoopGroup acceptorGroup = null;
// 處理用戶端相關操作線程組,負責處理與用戶端的資料通訊
private EventLoopGroup clientGroup = null;
// 服務啟動相關配置資訊
private ServerBootstrap bootstrap = null;
public Server4Heatbeat(){
init();
}
private void init(){
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
// 綁定線程組
bootstrap.group(acceptorGroup, clientGroup);
// 設定通訊模式為NIO
bootstrap.channel(NioServerSocketChannel.class);
// 設定緩沖區大小
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
// SO_SNDBUF發送緩沖區,SO_RCVBUF接收緩沖區,SO_KEEPALIVE開啟心跳監測(保證連接配接有效)
bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
.option(ChannelOption.SO_RCVBUF, 16*1024)
.option(ChannelOption.SO_KEEPALIVE, true);
}
public ChannelFuture doAccept(int port) throws InterruptedException{
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
ch.pipeline().addLast(new Server4HeatbeatHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
return future;
}
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}
public static void main(String[] args){
ChannelFuture future = null;
Server4Heatbeat server = null;
try{
server = new Server4Heatbeat();
future = server.doAccept(9999);
System.out.println("server started.");
future.channel().closeFuture().sync();
}catch(InterruptedException e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != server){
server.release();
}
}
}
}
/**
* @Sharable注解 -
* 代表目前Handler是一個可以分享的處理器。也就意味着,伺服器注冊此Handler後,可以分享給多個用戶端同時使用。
* 如果不使用注解描述類型,則每次用戶端請求時,必須為用戶端重新建立一個新的Handler對象。
*
*/
package com.hhxy.netty.heatbeat;
import java.util.ArrayList;
import java.util.List;
import com.hhxy.utils.HeatbeatMessage;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
@Sharable
public class Server4HeatbeatHandler extends ChannelHandlerAdapter {
private static List<String> credentials = new ArrayList<>();
private static final String HEATBEAT_SUCCESS = "SERVER_RETURN_HEATBEAT_SUCCESS";
public Server4HeatbeatHandler(){
// 初始化用戶端清單資訊。一般通過配置檔案讀取或資料庫讀取。
credentials.add("192.168.199.222_WIN-QIUB2JF5TDP");
}
// 業務處理邏輯
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof String){
this.checkCredential(ctx, msg.toString());
} else if (msg instanceof HeatbeatMessage){
this.readHeatbeatMessage(ctx, msg);
} else {
ctx.writeAndFlush("wrong message").addListener(ChannelFutureListener.CLOSE);
}
}
private void readHeatbeatMessage(ChannelHandlerContext ctx, Object msg){
HeatbeatMessage message = (HeatbeatMessage) msg;
System.out.println(message);
System.out.println("=======================================");
ctx.writeAndFlush("receive heatbeat message");
}
/**
* 身份檢查。檢查用戶端身份是否有效。
* 用戶端身份資訊應該是通過資料庫或資料檔案定制的。
* 身份通過 - 傳回确認消息。
* 身份無效 - 斷開連接配接
* @param ctx
* @param credential
*/
private void checkCredential(ChannelHandlerContext ctx, String credential){
System.out.println(credential);
System.out.println(credentials);
if(credentials.contains(credential)){
ctx.writeAndFlush(HEATBEAT_SUCCESS);
}else{
ctx.writeAndFlush("no credential contains").addListener(ChannelFutureListener.CLOSE);
}
}
// 異常處理邏輯
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("server exceptionCaught method run...");
// cause.printStackTrace();
ctx.close();
}
}
/**
* 1. 單線程組
* 2. Bootstrap配置啟動資訊
* 3. 注冊業務處理Handler
* 4. connect連接配接服務,并發起請求
*/
package com.hhxy.netty.heatbeat;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import com.hhxy.utils.GzipUtils;
import com.hhxy.utils.RequestMessage;
import com.hhxy.utils.SerializableFactory4Marshalling;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Client4Heatbeat {
// 處理請求和處理服務端響應的線程組
private EventLoopGroup group = null;
// 服務啟動相關配置資訊
private Bootstrap bootstrap = null;
public Client4Heatbeat(){
init();
}
private void init(){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
// 綁定線程組
bootstrap.group(group);
// 設定通訊模式為NIO
bootstrap.channel(NioSocketChannel.class);
}
public ChannelFuture doRequest(String host, int port) throws InterruptedException{
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
ch.pipeline().addLast(new Client4HeatbeatHandler());
}
});
ChannelFuture future = this.bootstrap.connect(host, port).sync();
return future;
}
public void release(){
this.group.shutdownGracefully();
}
public static void main(String[] args) {
Client4Heatbeat client = null;
ChannelFuture future = null;
try{
client = new Client4Heatbeat();
future = client.doRequest("localhost", 9999);
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(null != client){
client.release();
}
}
}
}
package com.hhxy.netty.heatbeat;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.hyperic.sigar.CpuPerc;
import org.hyperic.sigar.FileSystem;
import org.hyperic.sigar.Mem;
import org.hyperic.sigar.Sigar;
import com.hhxy.utils.HeatbeatMessage;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public class Client4HeatbeatHandler extends ChannelHandlerAdapter {
private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
private ScheduledFuture heatbeat;
private InetAddress remoteAddr;
private static final String HEATBEAT_SUCCESS = "SERVER_RETURN_HEATBEAT_SUCCESS";
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 擷取本地INET資訊
this.remoteAddr = InetAddress.getLocalHost();
// 擷取本地計算機名
String computerName = System.getenv().get("COMPUTERNAME");
String credentials = this.remoteAddr.getHostAddress() + "_" + computerName;
System.out.println(credentials);
// 發送到伺服器,作為資訊比對證書
ctx.writeAndFlush(credentials);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try{
if(msg instanceof String){
if(HEATBEAT_SUCCESS.equals(msg)){
this.heatbeat = this.executorService.scheduleWithFixedDelay(new HeatbeatTask(ctx), 0L, 2L, TimeUnit.SECONDS);
System.out.println("client receive - " + msg);
}else{
System.out.println("client receive - " + msg);
}
}
}finally{
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught method run...");
// cause.printStackTrace();
// 回收資源
if(this.heatbeat != null){
this.heatbeat.cancel(true);
this.heatbeat = null;
}
ctx.close();
}
class HeatbeatTask implements Runnable{
private ChannelHandlerContext ctx;
public HeatbeatTask(){
}
public HeatbeatTask(ChannelHandlerContext ctx){
this.ctx = ctx;
}
public void run(){
try {
HeatbeatMessage msg = new HeatbeatMessage();
msg.setIp(remoteAddr.getHostAddress());
Sigar sigar = new Sigar();
// CPU資訊
CpuPerc cpuPerc = sigar.getCpuPerc();
Map<String, Object> cpuMsgMap = new HashMap<>();
cpuMsgMap.put("Combined", cpuPerc.getCombined());
cpuMsgMap.put("User", cpuPerc.getUser());
cpuMsgMap.put("Sys", cpuPerc.getSys());
cpuMsgMap.put("Wait", cpuPerc.getWait());
cpuMsgMap.put("Idle", cpuPerc.getIdle());
// 記憶體資訊
Map<String, Object> memMsgMap = new HashMap<>();
Mem mem = sigar.getMem();
memMsgMap.put("Total", mem.getTotal());
memMsgMap.put("Used", mem.getUsed());
memMsgMap.put("Free", mem.getFree());
// 檔案系統
Map<String, Object> fileSysMsgMap = new HashMap<>();
FileSystem[] list = sigar.getFileSystemList();
fileSysMsgMap.put("FileSysCount", list.length);
List<String> msgList = null;
for(FileSystem fs : list){
msgList = new ArrayList<>();
msgList.add(fs.getDevName() + "總大小: " + sigar.getFileSystemUsage(fs.getDirName()).getTotal() + "KB");
msgList.add(fs.getDevName() + "剩餘大小: " + sigar.getFileSystemUsage(fs.getDirName()).getFree() + "KB");
fileSysMsgMap.put(fs.getDevName(), msgList);
}
msg.setCpuMsgMap(cpuMsgMap);
msg.setMemMsgMap(memMsgMap);
msg.setFileSysMsgMap(fileSysMsgMap);
ctx.writeAndFlush(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
6:HTTP協定處理
使用Netty服務開發。實作HTTP協定處理邏輯。
package com.hhxy.netty.http;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* http協定檔案傳輸
* @author Qixuan.Chen
* 建立時間:2015年5月4日
*/
public class HttpStaticFileServer {
private final int port;//端口
public HttpStaticFileServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();//線程一 //這個是用于serversocketchannel的event
EventLoopGroup workerGroup = new NioEventLoopGroup();//線程二//這個是用于處理accept到的channel
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new HttpStaticFileServerInitializer());
b.bind(port).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8089;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8089;
}
new HttpStaticFileServer(port).run();//啟動服務
}
}
package com.hhxy.netty.http;
import static io.netty.handler.codec.http.HttpHeaderNames.CACHE_CONTROL;
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpHeaderNames.DATE;
import static io.netty.handler.codec.http.HttpHeaderNames.EXPIRES;
import static io.netty.handler.codec.http.HttpHeaderNames.IF_MODIFIED_SINCE;
import static io.netty.handler.codec.http.HttpHeaderNames.LAST_MODIFIED;
import static io.netty.handler.codec.http.HttpHeaderNames.LOCATION;
import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
import static io.netty.handler.codec.http.HttpResponseStatus.FOUND;
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.Locale;
import java.util.TimeZone;
import java.util.regex.Pattern;
import javax.activation.MimetypesFileTypeMap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelProgressiveFuture;
import io.netty.channel.ChannelProgressiveFutureListener;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderUtil;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.stream.ChunkedFile;
import io.netty.util.CharsetUtil;
/**
* A simple handler that serves incoming HTTP requests to send their respective
* HTTP responses. It also implements {@code 'If-Modified-Since'} header to
* take advantage of browser cache, as described in
* <a href="http://tools.ietf.org/html/rfc2616#section-14.25" target="_blank" rel="external nofollow" >RFC 2616</a>.
*
* <h3>How Browser Caching Works</h3>
*
* Web browser caching works with HTTP headers as illustrated by the following
* sample:
* <ol>
* <li>Request #1 returns the content of {@code /file1.txt}.</li>
* <li>Contents of {@code /file1.txt} is cached by the browser.</li>
* <li>Request #2 for {@code /file1.txt} does return the contents of the
* file again. Rather, a 304 Not Modified is returned. This tells the
* browser to use the contents stored in its cache.</li>
* <li>The server knows the file has not been modified because the
* {@code If-Modified-Since} date is the same as the file's last
* modified date.</li>
* </ol>
*
* <pre>
* Request #1 Headers
* ===================
* GET /file1.txt HTTP/1.1
*
* Response #1 Headers
* ===================
* HTTP/1.1 200 OK
* Date: Tue, 01 Mar 2011 22:44:26 GMT
* Last-Modified: Wed, 30 Jun 2010 21:36:48 GMT
* Expires: Tue, 01 Mar 2012 22:44:26 GMT
* Cache-Control: private, max-age=31536000
*
* Request #2 Headers
* ===================
* GET /file1.txt HTTP/1.1
* If-Modified-Since: Wed, 30 Jun 2010 21:36:48 GMT
*
* Response #2 Headers
* ===================
* HTTP/1.1 304 Not Modified
* Date: Tue, 01 Mar 2011 22:44:28 GMT
*
* </pre>
*/
public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";
public static final String HTTP_DATE_GMT_TIMEZONE = "GMT";
public static final int HTTP_CACHE_SECONDS = 60;
private final boolean useSendFile;
public HttpStaticFileServerHandler(boolean useSendFile) {
this.useSendFile = useSendFile;
}
/**
* 類似channelRead方法。
*/
@Override
public void messageReceived(
ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (!request.decoderResult().isSuccess()) {
sendError(ctx, BAD_REQUEST);
return;
}
if (request.method() != GET) {
sendError(ctx, METHOD_NOT_ALLOWED);
return;
}
final String uri = request.uri();
System.out.println("-----uri----"+uri);
final String path = sanitizeUri(uri);
System.out.println("-----path----"+path);
if (path == null) {
sendError(ctx, FORBIDDEN);
return;
}
File file = new File(path);
if (file.isHidden() || !file.exists()) {
sendError(ctx, NOT_FOUND);
return;
}
if (file.isDirectory()) {
if (uri.endsWith("/")) {
sendListing(ctx, file);
} else {
sendRedirect(ctx, uri + '/');
}
return;
}
if (!file.isFile()) {
sendError(ctx, FORBIDDEN);
return;
}
// Cache Validation
String ifModifiedSince = (String) request.headers().get(IF_MODIFIED_SINCE);
if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) {
SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
Date ifModifiedSinceDate = dateFormatter.parse(ifModifiedSince);
// Only compare up to the second because the datetime format we send to the client
// does not have milliseconds
long ifModifiedSinceDateSeconds = ifModifiedSinceDate.getTime() / 1000;
long fileLastModifiedSeconds = file.lastModified() / 1000;
if (ifModifiedSinceDateSeconds == fileLastModifiedSeconds) {
sendNotModified(ctx);
return;
}
}
RandomAccessFile raf;
try {
raf = new RandomAccessFile(file, "r");
} catch (FileNotFoundException fnfe) {
sendError(ctx, NOT_FOUND);
return;
}
long fileLength = raf.length();
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
//setContentLength(response, fileLength);
HttpHeaderUtil.setContentLength(response, fileLength);
setContentTypeHeader(response, file);
setDateAndCacheHeaders(response, file);
if (HttpHeaderUtil.isKeepAlive(request)) {
response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
// Write the initial line and the header.
ctx.write(response);
// Write the content.
ChannelFuture sendFileFuture;
if (useSendFile) {
sendFileFuture =
ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
} else {
sendFileFuture =
ctx.write(new ChunkedFile(raf, 0, fileLength, 8192), ctx.newProgressivePromise());
}
sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
@Override
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
if (total < 0) { // total unknown
System.err.println("Transfer progress: " + progress);
} else {
System.err.println("Transfer progress: " + progress + " / " + total);
}
}
@Override
public void operationComplete(ChannelProgressiveFuture future) throws Exception {
System.err.println("Transfer complete.");
}
});
// Write the end marker
ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
// Decide whether to close the connection or not.
if (!HttpHeaderUtil.isKeepAlive(request)) {
// Close the connection when the whole content is written out.
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if (ctx.channel().isActive()) {
sendError(ctx, INTERNAL_SERVER_ERROR);
}
}
private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*");
/**
* 路徑解碼
* @param uri
* @return
*/
private static String sanitizeUri(String uri) {
// Decode the path.
try {
uri = URLDecoder.decode(uri, "UTF-8");
} catch (UnsupportedEncodingException e) {
try {
uri = URLDecoder.decode(uri, "ISO-8859-1");
} catch (UnsupportedEncodingException e1) {
throw new Error();
}
}
if (!uri.startsWith("/")) {
return null;
}
// Convert file separators.
uri = uri.replace('/', File.separatorChar);
// Simplistic dumb security check.
// You will have to do something serious in the production environment.
if (uri.contains(File.separator + '.') ||
uri.contains('.' + File.separator) ||
uri.startsWith(".") || uri.endsWith(".") ||
INSECURE_URI.matcher(uri).matches()) {
return null;
}
// Convert to absolute path.
return System.getProperty("user.dir") + File.separator + uri;
}
private static final Pattern ALLOWED_FILE_NAME = Pattern.compile("[A-Za-z0-9][-_A-Za-z0-9\\.]*");
private static void sendListing(ChannelHandlerContext ctx, File dir) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);
response.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8");
StringBuilder buf = new StringBuilder();
String dirPath = dir.getPath();
buf.append("<!DOCTYPE html>\r\n");
buf.append("<html><head><title>");
buf.append("Listing of: ");
buf.append(dirPath);
buf.append("</title></head><body>\r\n");
buf.append("<h3>Listing of: ");
buf.append(dirPath);
buf.append("</h3>\r\n");
buf.append("<ul>");
buf.append("<li><a href=\"../\">..</a></li>\r\n");
for (File f: dir.listFiles()) {
if (f.isHidden() || !f.canRead()) {
continue;
}
String name = f.getName();
if (!ALLOWED_FILE_NAME.matcher(name).matches()) {
continue;
}
buf.append("<li><a href=\"");
buf.append(name);
buf.append("\">");
buf.append(name);
buf.append("</a></li>\r\n");
}
buf.append("</ul></body></html>\r\n");
ByteBuf buffer = Unpooled.copiedBuffer(buf, CharsetUtil.UTF_8);
response.content().writeBytes(buffer);
buffer.release();
// Close the connection as soon as the error message is sent.
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private static void sendRedirect(ChannelHandlerContext ctx, String newUri) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND);
response.headers().set(LOCATION, newUri);
// Close the connection as soon as the error message is sent.
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
// Close the connection as soon as the error message is sent.
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
/**
* When file timestamp is the same as what the browser is sending up, send a "304 Not Modified"
*
* @param ctx
* Context
*/
private static void sendNotModified(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, NOT_MODIFIED);
setDateHeader(response);
// Close the connection as soon as the error message is sent.
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
/**
* Sets the Date header for the HTTP response
*
* @param response
* HTTP response
*/
private static void setDateHeader(FullHttpResponse response) {
SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE));
Calendar time = new GregorianCalendar();
response.headers().set(DATE, dateFormatter.format(time.getTime()));
}
/**
* Sets the Date and Cache headers for the HTTP Response
*
* @param response
* HTTP response
* @param fileToCache
* file to extract content type
*/
private static void setDateAndCacheHeaders(HttpResponse response, File fileToCache) {
SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE));
// Date header
Calendar time = new GregorianCalendar();
response.headers().set(DATE, dateFormatter.format(time.getTime()));
// Add cache headers
time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);
response.headers().set(EXPIRES, dateFormatter.format(time.getTime()));
response.headers().set(CACHE_CONTROL, "private, max-age=" + HTTP_CACHE_SECONDS);
response.headers().set(
LAST_MODIFIED, dateFormatter.format(new Date(fileToCache.lastModified())));
}
/**
* Sets the content type header for the HTTP Response
*
* @param response
* HTTP response
* @param file
* file to extract content type
*/
private static void setContentTypeHeader(HttpResponse response, File file) {
MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
response.headers().set(CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath()));
}
}
package com.hhxy.netty.http;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
public class HttpStaticFileServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = ch.pipeline();
// Uncomment the following line if you want HTTPS
//SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
//engine.setUseClientMode(false);
//pipeline.addLast("ssl", new SslHandler(engine));
/**
* (1)ReadTimeoutHandler,用于控制讀取資料的時候的逾時,10表示如果10秒鐘都沒有資料讀取了,那麼就引發逾時,然後關閉目前的channel
(2)WriteTimeoutHandler,用于控制資料輸出的時候的逾時,構造參數1表示如果持續1秒鐘都沒有資料寫了,那麼就逾時。
(3)HttpRequestrianDecoder,這個handler用于從讀取的資料中将http封包資訊解析出來,無非就是什麼requestline,header,body什麼的。。。
(4)然後HttpObjectAggregator則是用于将上賣解析出來的http封包的資料組裝成為封裝好的httprequest對象。。
(5)HttpresponseEncoder,用于将使用者傳回的httpresponse編碼成為http封包格式的資料
(6)HttpHandler,自定義的handler,用于處理接收到的http請求。
*/
pipeline.addLast("decoder", new HttpRequestDecoder());// http-request解碼器,http伺服器端對request解碼
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));//對傳輸檔案大少進行限制
pipeline.addLast("encoder", new HttpResponseEncoder());//http-response解碼器,http伺服器端對response編碼
// 向用戶端發送資料的一個Handler
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("handler", new HttpStaticFileServerHandler(true)); // Specify false if SSL.(如果是ssl,就指定為false)
}
}
7:其他輔助代碼
package com.hhxy.utils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public class GzipUtils {
public static void main(String[] args) throws Exception {
FileInputStream fis = new FileInputStream("D:\\3\\1.jpg");
byte[] temp = new byte[fis.available()];
int length = fis.read(temp);
System.out.println("長度 : " + length);
byte[] zipArray = GzipUtils.zip(temp);
System.out.println("壓縮後的長度 : " + zipArray.length);
byte[] unzipArray = GzipUtils.unzip(zipArray);
System.out.println("解壓縮後的長度 : " + unzipArray.length);
FileOutputStream fos = new FileOutputStream("D:\\3\\101.jpg");
fos.write(unzipArray);
fos.flush();
fos.close();
fis.close();
}
/**
* 解壓縮
* @param source 源資料。需要解壓的資料。
* @return 解壓後的資料。 恢複的資料。
* @throws Exception
*/
public static byte[] unzip(byte[] source) throws Exception{
ByteArrayOutputStream out = new ByteArrayOutputStream();
ByteArrayInputStream in = new ByteArrayInputStream(source);
// JDK提供的。 專門用于壓縮使用的流對象。可以處理位元組數組資料。
GZIPInputStream zipIn = new GZIPInputStream(in);
byte[] temp = new byte[256];
int length = 0;
while((length = zipIn.read(temp, 0, temp.length)) != -1){
out.write(temp, 0, length);
}
// 将位元組數組輸出流中的資料,轉換為一個位元組數組。
byte[] target = out.toByteArray();
zipIn.close();
out.close();
return target;
}
/**
* 壓縮
* @param source 源資料,需要壓縮的資料
* @return 壓縮後的資料。
* @throws Exception
*/
public static byte[] zip(byte[] source) throws Exception{
ByteArrayOutputStream out = new ByteArrayOutputStream();
// 輸出流,JDK提供的,提供解壓縮功能。
GZIPOutputStream zipOut = new GZIPOutputStream(out);
// 将壓縮資訊寫入到記憶體。 寫入的過程會實作解壓。
zipOut.write(source);
// 結束。
zipOut.finish();
byte[] target = out.toByteArray();
zipOut.close();
return target;
}
}
package com.hhxy.utils;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Properties;
import org.hyperic.sigar.CpuInfo;
import org.hyperic.sigar.CpuPerc;
import org.hyperic.sigar.FileSystem;
import org.hyperic.sigar.FileSystemUsage;
import org.hyperic.sigar.Mem;
import org.hyperic.sigar.NetFlags;
import org.hyperic.sigar.NetInterfaceConfig;
import org.hyperic.sigar.NetInterfaceStat;
import org.hyperic.sigar.OperatingSystem;
import org.hyperic.sigar.Sigar;
import org.hyperic.sigar.SigarException;
import org.hyperic.sigar.Swap;
import org.hyperic.sigar.Who;
public class OSUtils {
public static void main(String[] args) {
try {
// System資訊,從jvm擷取
property();
System.out.println("----------------------------------");
// cpu資訊
cpu();
System.out.println("----------------------------------");
// 記憶體資訊
memory();
System.out.println("----------------------------------");
// 作業系統資訊
os();
System.out.println("----------------------------------");
// 使用者資訊
who();
System.out.println("----------------------------------");
// 檔案系統資訊
file();
System.out.println("----------------------------------");
// 網絡資訊
net();
System.out.println("----------------------------------");
// 以太網資訊
ethernet();
System.out.println("----------------------------------");
} catch (Exception e1) {
e1.printStackTrace();
}
}
private static void property() throws UnknownHostException {
Runtime r = Runtime.getRuntime();
Properties props = System.getProperties();
InetAddress addr;
addr = InetAddress.getLocalHost();
String ip = addr.getHostAddress();
Map<String, String> map = System.getenv();
String userName = map.get("USERNAME");// 擷取使用者名
String computerName = map.get("COMPUTERNAME");// 擷取計算機名
String userDomain = map.get("USERDOMAIN");// 擷取計算機域名
System.out.println("使用者名: " + userName);
System.out.println("計算機名: " + computerName);
System.out.println("計算機域名: " + userDomain);
System.out.println("本地ip位址: " + ip);
System.out.println("本地主機名: " + addr.getHostName());
System.out.println("JVM可以使用的總記憶體: " + r.totalMemory());
System.out.println("JVM可以使用的剩餘記憶體: " + r.freeMemory());
System.out.println("JVM可以使用的處理器個數: " + r.availableProcessors());
System.out.println("Java的運作環境版本: " + props.getProperty("java.version"));
System.out.println("Java的運作環境供應商: " + props.getProperty("java.vendor"));
System.out.println("Java供應商的URL: " + props.getProperty("java.vendor.url"));
System.out.println("Java的安裝路徑: " + props.getProperty("java.home"));
System.out.println("Java的虛拟機規範版本: " + props.getProperty("java.vm.specification.version"));
System.out.println("Java的虛拟機規範供應商: " + props.getProperty("java.vm.specification.vendor"));
System.out.println("Java的虛拟機規範名稱: " + props.getProperty("java.vm.specification.name"));
System.out.println("Java的虛拟機實作版本: " + props.getProperty("java.vm.version"));
System.out.println("Java的虛拟機實作供應商: " + props.getProperty("java.vm.vendor"));
System.out.println("Java的虛拟機實作名稱: " + props.getProperty("java.vm.name"));
System.out.println("Java運作時環境規範版本: " + props.getProperty("java.specification.version"));
System.out.println("Java運作時環境規範供應商: " + props.getProperty("java.specification.vender"));
System.out.println("Java運作時環境規範名稱: " + props.getProperty("java.specification.name"));
System.out.println("Java的類格式版本号: " + props.getProperty("java.class.version"));
System.out.println("Java的類路徑: " + props.getProperty("java.class.path"));
System.out.println("加載庫時搜尋的路徑清單: " + props.getProperty("java.library.path"));
System.out.println("預設的臨時檔案路徑: " + props.getProperty("java.io.tmpdir"));
System.out.println("一個或多個擴充目錄的路徑: " + props.getProperty("java.ext.dirs"));
System.out.println("作業系統的名稱: " + props.getProperty("os.name"));
System.out.println("作業系統的構架: " + props.getProperty("os.arch"));
System.out.println("作業系統的版本: " + props.getProperty("os.version"));
System.out.println("檔案分隔符: " + props.getProperty("file.separator"));
System.out.println("路徑分隔符: " + props.getProperty("path.separator"));
System.out.println("行分隔符: " + props.getProperty("line.separator"));
System.out.println("使用者的賬戶名稱: " + props.getProperty("user.name"));
System.out.println("使用者的主目錄: " + props.getProperty("user.home"));
System.out.println("使用者的目前工作目錄: " + props.getProperty("user.dir"));
}
private static void memory() throws SigarException {
Sigar sigar = new Sigar();
Mem mem = sigar.getMem();
// 記憶體總量
System.out.println("記憶體總量: " + mem.getTotal() / 1024L + "K av");
// 目前記憶體使用量
System.out.println("目前記憶體使用量: " + mem.getUsed() / 1024L + "K used");
// 目前記憶體剩餘量
System.out.println("目前記憶體剩餘量: " + mem.getFree() / 1024L + "K free");
Swap swap = sigar.getSwap();
// 交換區總量
System.out.println("交換區總量: " + swap.getTotal() / 1024L + "K av");
// 目前交換區使用量
System.out.println("目前交換區使用量: " + swap.getUsed() / 1024L + "K used");
// 目前交換區剩餘量
System.out.println("目前交換區剩餘量: " + swap.getFree() / 1024L + "K free");
}
private static void cpu() throws SigarException {
Sigar sigar = new Sigar();
CpuInfo infos[] = sigar.getCpuInfoList();
CpuPerc cpuList[] = null;
cpuList = sigar.getCpuPercList();
for (int i = 0; i < infos.length; i++) {// 不管是單塊CPU還是多CPU都适用
CpuInfo info = infos[i];
System.out.println("第" + (i + 1) + "塊CPU資訊");
System.out.println("CPU的總量MHz: " + info.getMhz());// CPU的總量MHz
System.out.println("CPU生産商: " + info.getVendor());// 獲得CPU的賣主,如:Intel
System.out.println("CPU類别: " + info.getModel());// 獲得CPU的類别,如:Celeron
System.out.println("CPU緩存數量: " + info.getCacheSize());// 緩沖存儲器數量
printCpuPerc(cpuList[i]);
}
}
private static void printCpuPerc(CpuPerc cpu) {
System.out.println("CPU使用者使用率: " + CpuPerc.format(cpu.getUser()));// 使用者使用率
System.out.println("CPU系統使用率: " + CpuPerc.format(cpu.getSys()));// 系統使用率
System.out.println("CPU目前等待率: " + CpuPerc.format(cpu.getWait()));// 目前等待率
System.out.println("CPU目前錯誤率: " + CpuPerc.format(cpu.getNice()));//
System.out.println("CPU目前空閑率: " + CpuPerc.format(cpu.getIdle()));// 目前空閑率
System.out.println("CPU總的使用率: " + CpuPerc.format(cpu.getCombined()));// 總的使用率
}
private static void os() {
OperatingSystem OS = OperatingSystem.getInstance();
// 作業系統核心類型如: 386、486、586等x86
System.out.println("作業系統: " + OS.getArch());
System.out.println("作業系統CpuEndian(): " + OS.getCpuEndian());//
System.out.println("作業系統DataModel(): " + OS.getDataModel());//
// 系統描述
System.out.println("作業系統的描述: " + OS.getDescription());
// 作業系統類型
// System.out.println("OS.getName(): " + OS.getName());
// System.out.println("OS.getPatchLevel(): " + OS.getPatchLevel());//
// 作業系統的賣主
System.out.println("作業系統的賣主: " + OS.getVendor());
// 賣主名稱
System.out.println("作業系統的賣主名: " + OS.getVendorCodeName());
// 作業系統名稱
System.out.println("作業系統名稱: " + OS.getVendorName());
// 作業系統賣主類型
System.out.println("作業系統賣主類型: " + OS.getVendorVersion());
// 作業系統的版本号
System.out.println("作業系統的版本号: " + OS.getVersion());
}
private static void who() throws SigarException {
Sigar sigar = new Sigar();
Who who[] = sigar.getWhoList();
if (who != null && who.length > 0) {
for (int i = 0; i < who.length; i++) {
// System.out.println("目前系統程序表中的使用者名" + String.valueOf(i));
Who _who = who[i];
System.out.println("使用者控制台: " + _who.getDevice());
System.out.println("使用者host: " + _who.getHost());
// System.out.println("getTime(): " + _who.getTime());
// 目前系統程序表中的使用者名
System.out.println("目前系統程序表中的使用者名: " + _who.getUser());
}
}
}
private static void file() throws Exception {
Sigar sigar = new Sigar();
FileSystem fslist[] = sigar.getFileSystemList();
try {
for (int i = 0; i < fslist.length; i++) {
System.out.println("分區的盤符名稱" + i);
FileSystem fs = fslist[i];
// 分區的盤符名稱
System.out.println("盤符名稱: " + fs.getDevName());
// 分區的盤符名稱
System.out.println("盤符路徑: " + fs.getDirName());
System.out.println("盤符标志: " + fs.getFlags());//
// 檔案系統類型,比如 FAT32、NTFS
System.out.println("盤符類型: " + fs.getSysTypeName());
// 檔案系統類型名,比如本地硬碟、光驅、網絡檔案系統等
System.out.println("盤符類型名: " + fs.getTypeName());
// 檔案系統類型
System.out.println("盤符檔案系統類型: " + fs.getType());
FileSystemUsage usage = null;
usage = sigar.getFileSystemUsage(fs.getDirName());
switch (fs.getType()) {
case 0: // TYPE_UNKNOWN :未知
break;
case 1: // TYPE_NONE
break;
case 2: // TYPE_LOCAL_DISK : 本地硬碟
// 檔案系統總大小
System.out.println(fs.getDevName() + "總大小: " + usage.getTotal() + "KB");
// 檔案系統剩餘大小
System.out.println(fs.getDevName() + "剩餘大小: " + usage.getFree() + "KB");
// 檔案系統可用大小
System.out.println(fs.getDevName() + "可用大小: " + usage.getAvail() + "KB");
// 檔案系統已經使用量
System.out.println(fs.getDevName() + "已經使用量: " + usage.getUsed() + "KB");
double usePercent = usage.getUsePercent() * 100D;
// 檔案系統資源的使用率
System.out.println(fs.getDevName() + "資源的使用率: " + usePercent + "%");
break;
case 3:// TYPE_NETWORK :網絡
break;
case 4:// TYPE_RAM_DISK :閃存
break;
case 5:// TYPE_CDROM :光驅
break;
case 6:// TYPE_SWAP :頁面交換
break;
}
System.out.println(fs.getDevName() + "讀出: " + usage.getDiskReads());
System.out.println(fs.getDevName() + "寫入: " + usage.getDiskWrites());
}
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
return;
}
private static void net() throws Exception {
Sigar sigar = new Sigar();
String ifNames[] = sigar.getNetInterfaceList();
for (int i = 0; i < ifNames.length; i++) {
String name = ifNames[i];
NetInterfaceConfig ifconfig = sigar.getNetInterfaceConfig(name);
System.out.println("網絡裝置名: " + name);// 網絡裝置名
System.out.println("IP位址: " + ifconfig.getAddress());// IP位址
System.out.println("子網路遮罩: " + ifconfig.getNetmask());// 子網路遮罩
if ((ifconfig.getFlags() & 1L) <= 0L) {
System.out.println("!IFF_UP...skipping getNetInterfaceStat");
continue;
}
NetInterfaceStat ifstat = sigar.getNetInterfaceStat(name);
System.out.println(name + "接收的總包裹數:" + ifstat.getRxPackets());// 接收的總包裹數
System.out.println(name + "發送的總包裹數:" + ifstat.getTxPackets());// 發送的總包裹數
System.out.println(name + "接收到的總位元組數:" + ifstat.getRxBytes());// 接收到的總位元組數
System.out.println(name + "發送的總位元組數:" + ifstat.getTxBytes());// 發送的總位元組數
System.out.println(name + "接收到的錯誤包數:" + ifstat.getRxErrors());// 接收到的錯誤包數
System.out.println(name + "發送資料包時的錯誤數:" + ifstat.getTxErrors());// 發送資料包時的錯誤數
System.out.println(name + "接收時丢棄的包數:" + ifstat.getRxDropped());// 接收時丢棄的包數
System.out.println(name + "發送時丢棄的包數:" + ifstat.getTxDropped());// 發送時丢棄的包數
}
}
private static void ethernet() throws SigarException {
Sigar sigar = null;
sigar = new Sigar();
String[] ifaces = sigar.getNetInterfaceList();
for (int i = 0; i < ifaces.length; i++) {
NetInterfaceConfig cfg = sigar.getNetInterfaceConfig(ifaces[i]);
if (NetFlags.LOOPBACK_ADDRESS.equals(cfg.getAddress()) || (cfg.getFlags() & NetFlags.IFF_LOOPBACK) != 0
|| NetFlags.NULL_HWADDR.equals(cfg.getHwaddr())) {
continue;
}
System.out.println(cfg.getName() + "IP位址:" + cfg.getAddress());// IP位址
System.out.println(cfg.getName() + "網關廣播位址:" + cfg.getBroadcast());// 網關廣播位址
System.out.println(cfg.getName() + "網卡MAC位址:" + cfg.getHwaddr());// 網卡MAC位址
System.out.println(cfg.getName() + "子網路遮罩:" + cfg.getNetmask());// 子網路遮罩
System.out.println(cfg.getName() + "網卡描述資訊:" + cfg.getDescription());// 網卡描述資訊
System.out.println(cfg.getName() + "網卡類型" + cfg.getType());//
}
}
}
package com.hhxy.utils;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;
public class SerializableFactory4Marshalling {
/**
* 建立Jboss Marshalling解碼器MarshallingDecoder
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder() {
//首先通過Marshalling工具類的精通方法擷取Marshalling執行個體對象 參數serial辨別建立的是java序列化工廠對象。
//jboss-marshalling-serial 包提供
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
//建立了MarshallingConfiguration對象,配置了版本号為5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
// 序列化版本。隻要使用JDK5以上版本,version隻能定義為5。
configuration.setVersion(5);
//根據marshallerFactory和configuration建立provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//建構Netty的MarshallingDecoder對象,倆個參數分别為provider和單個消息序列化後的最大長度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
return decoder;
}
/**
* 建立Jboss Marshalling編碼器MarshallingEncoder
* @return MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
//建構Netty的MarshallingEncoder對象,MarshallingEncoder用于實作序列化接口的POJO對象序列化為二進制數組
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
package com.hhxy.utils;
import java.io.Serializable;
public class RequestMessage implements Serializable {
private static final long serialVersionUID = 7084843947860990140L;
private Long id;
private String message;
private byte[] attachment;
@Override
public String toString() {
return "RequestMessage [id=" + id + ", message=" + message + "]";
}
public RequestMessage() {
super();
}
public RequestMessage(Long id, String message, byte[] attachment) {
super();
this.id = id;
this.message = message;
this.attachment = attachment;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public byte[] getAttachment() {
return attachment;
}
public void setAttachment(byte[] attachment) {
this.attachment = attachment;
}
}
package com.hhxy.utils;
import java.io.Serializable;
public class ResponseMessage implements Serializable {
private static final long serialVersionUID = -8134313953478922076L;
private Long id;
private String message;
@Override
public String toString() {
return "ResponseMessage [id=" + id + ", message=" + message + "]";
}
public ResponseMessage() {
super();
}
public ResponseMessage(Long id, String message) {
super();
this.id = id;
this.message = message;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
package com.hhxy.utils;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
public class HeatbeatMessage implements Serializable {
private static final long serialVersionUID = 2827219147304706826L;
private String ip;
private Map<String, Object> cpuMsgMap;
private Map<String, Object> memMsgMap;
private Map<String, Object> fileSysMsgMap;
@Override
public String toString() {
return "HeatbeatMessage [\nip=" + ip
+ ", \ncpuMsgMap=" + cpuMsgMap
+ ", \nmemMsgMap=" + memMsgMap
+ ", \nfileSysMsgMap=" + fileSysMsgMap + "]";
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public Map<String, Object> getCpuMsgMap() {
return cpuMsgMap;
}
public void setCpuMsgMap(Map<String, Object> cpuMsgMap) {
this.cpuMsgMap = cpuMsgMap;
}
public Map<String, Object> getMemMsgMap() {
return memMsgMap;
}
public void setMemMsgMap(Map<String, Object> memMsgMap) {
this.memMsgMap = memMsgMap;
}
public Map<String, Object> getFileSysMsgMap() {
return fileSysMsgMap;
}
public void setFileSysMsgMap(Map<String, Object> fileSysMsgMap) {
this.fileSysMsgMap = fileSysMsgMap;
}
}
五:流資料的傳輸處理
在基于流的傳輸裡比如TCP/IP,接收到的資料會先被存儲到一個socket接收緩沖裡。不幸的是,基于流的傳輸并不是一個資料包隊列,而是一個位元組隊列。即使你發送了2個獨立的資料包,作業系統也不會作為2個消息處理而僅僅是作為一連串的位元組而言。是以這是不能保證你遠端寫入的資料就會準确地讀取。是以一個接收方不管他是用戶端還是服務端,都應該把接收到的資料整理成一個或者多個更有意思并且能夠讓程式的業務邏輯更好了解的資料。
在處理流資料粘包拆包時,可以使用下述處理方式:
使用定長資料處理,如:每個完整請求資料長度為8位元組等。(FixedLengthFrameDecoder)
使用特殊分隔符的方式處理,如:每個完整請求資料末尾使用’\0’作為資料結束标記。(DelimiterBasedFrameDecoder)
使用自定義協定方式處理,如:http協定格式等。
使用POJO來替代傳遞的流資料,如:每個完整的請求資料都是一個RequestMessage對象,在Java語言中,使用POJO更符合語種特性,推薦使用。