天天看點

搭建生産級的Netty項目

Netty是Trustin Lee在2004年開發的一款高性能的網絡應用程式架構。相比于JDK自帶的NIO,Netty做了相當多的增強,且隔離了jdk nio的實作細節,API也比較友好,還支援流量整形等進階特性。在我們常見的一些開源項目中已經普遍的應用到了Netty,比如Dubbo、Elasticsearch、Zookeeperd等。

Netty的具體開發

提示:因代碼相對較多,這裡隻展示其主要部分,至于項目中用到的編解碼器、工具類,請直接拉到最後下載下傳源碼!也歡迎順手給個Star~

需要的依賴
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>
<dependency>
    <groupId>io.dropwizard.metrics</groupId>
    <artifactId>metrics-core</artifactId>
    <version>4.1.1</version>
</dependency>
<dependency>
    <groupId>io.dropwizard.metrics</groupId>
    <artifactId>metrics-jmx</artifactId>
    <version>4.1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.29.Final</version>
</dependency>
           
Client端代碼
package com.example.nettydemo.client;

import com.example.nettydemo.client.codec.*;
import com.example.nettydemo.client.codec.dispatcher.OperationResultFuture;
import com.example.nettydemo.client.codec.dispatcher.RequestPendingCenter;
import com.example.nettydemo.client.codec.dispatcher.ResponseDispatcherHandler;
import com.example.nettydemo.common.RequestMessage;
import com.example.nettydemo.common.string.StringOperation;
import com.example.nettydemo.util.IdUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import javax.net.ssl.SSLException;
import java.util.concurrent.ExecutionException;

public class Client {

    public static void main(String[] args) throws InterruptedException, ExecutionException, SSLException {

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);

        //用戶端連接配接伺服器最大允許時間,預設為30s
        bootstrap.option(NioChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000); //10s

        NioEventLoopGroup group = new NioEventLoopGroup();
        try {

            bootstrap.group(group);

            RequestPendingCenter requestPendingCenter = new RequestPendingCenter();
            LoggingHandler loggingHandler = new LoggingHandler(LogLevel.INFO);

            bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();

                    pipeline.addLast(new FrameDecoder());
                    pipeline.addLast(new FrameEncoder());

                    pipeline.addLast(new ProtocolEncoder());
                    pipeline.addLast(new ProtocolDecoder());

                    pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter));
                    pipeline.addLast(new OperationToRequestMessageEncoder());

//                    pipeline.addLast(loggingHandler);

                }
            });

            //連接配接服務
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888);
            //因為future是異步執行,是以需要先連接配接上後,再進行下一步操作
            channelFuture.sync();

            long streamId = IdUtil.nextId();
            /**
             * 發送資料測試,按照定義的規則組裝資料
             */
//            OrderOperation orderOperation =  new OrderOperation(1001, "你好啊,hi");
            RequestMessage requestMessage = new RequestMessage(streamId, new StringOperation(1234, "你好啊,hi"));

            //将future放入center
            OperationResultFuture operationResultFuture = new OperationResultFuture();
            requestPendingCenter.add(streamId, operationResultFuture);

            //發送消息
            for (int i = 0; i < 10; i++) {
                channelFuture.channel().writeAndFlush(requestMessage);
            }

            //阻塞等待結果,結果來了之後會調用ResponseDispatcherHandler去set結果
//            OperationResult operationResult = operationResultFuture.get();
//            //将結果列印
//            System.out.println("傳回:"+operationResult);

            channelFuture.channel().closeFuture().get();

        } finally {
            group.shutdownGracefully();
        }

    }

}

           
Server端代碼
package com.example.nettydemo.server;

import com.example.nettydemo.server.codec.FrameDecoder;
import com.example.nettydemo.server.codec.FrameEncoder;
import com.example.nettydemo.server.codec.ProtocolDecoder;
import com.example.nettydemo.server.codec.ProtocolEncoder;
import com.example.nettydemo.server.handler.MetricsHandler;
import com.example.nettydemo.server.handler.ServerIdleCheckHandler;
import com.example.nettydemo.server.handler.ServerProcessHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor;
import lombok.extern.slf4j.Slf4j;

import javax.net.ssl.SSLException;
import java.security.cert.CertificateException;
import java.util.concurrent.ExecutionException;

/**
 * netty server 入口
 */
@Slf4j
public class Server {


    public static void main(String... args) throws InterruptedException, ExecutionException, CertificateException, SSLException {

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //設定channel模式,因為是server是以使用NioServerSocketChannel
        serverBootstrap.channel(NioServerSocketChannel.class);

        //最大的等待連接配接數量
        serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024);
        //設定是否啟用 Nagle 算法:用将小的碎片資料連接配接成更大的封包 來提高發送效率。
        //如果需要發送一些較小的封包,則需要禁用該算法
        serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true);

        //設定netty自帶的log,并設定級别
        serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));

        //thread
        //使用者指定線程名
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
        NioEventLoopGroup workGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
        UnorderedThreadPoolEventExecutor businessGroup = new UnorderedThreadPoolEventExecutor(10, new DefaultThreadFactory("business"));

        //隻能使用一個線程,因GlobalTrafficShapingHandler比較輕量級
        NioEventLoopGroup eventLoopGroupForTrafficShaping = new NioEventLoopGroup(0, new DefaultThreadFactory("TS"));

        try {
            //設定react方式
            serverBootstrap.group(bossGroup, workGroup);

            //metrics
            MetricsHandler metricsHandler = new MetricsHandler();

            //trafficShaping流量整形
            //long writeLimit 寫入時控制, long readLimit 讀取時控制 具體設定看業務修改
            GlobalTrafficShapingHandler globalTrafficShapingHandler = new GlobalTrafficShapingHandler(eventLoopGroupForTrafficShaping, 10 * 1024 * 1024, 10 * 1024 * 1024);


            //log
            LoggingHandler debugLogHandler = new LoggingHandler(LogLevel.DEBUG);
            LoggingHandler infoLogHandler = new LoggingHandler(LogLevel.INFO);

            //設定childHandler,按執行順序放
            serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {

                    ChannelPipeline pipeline = ch.pipeline();

                    pipeline.addLast("debugLog", debugLogHandler);
                    pipeline.addLast("tsHandler", globalTrafficShapingHandler);
                    pipeline.addLast("metricHandler", metricsHandler);
                    pipeline.addLast("idleHandler", new ServerIdleCheckHandler());

                    pipeline.addLast("frameDecoder", new FrameDecoder());
                    pipeline.addLast("frameEncoder", new FrameEncoder());
                    pipeline.addLast("protocolDecoder", new ProtocolDecoder());
                    pipeline.addLast("protocolEncoder", new ProtocolEncoder());

                    pipeline.addLast("infoLog", infoLogHandler);
                    //對flush增強,減少flush次數犧牲延遲增強吞吐量
                    pipeline.addLast("flushEnhance", new FlushConsolidationHandler(10, true));
                    //為業務處理指定單獨的線程池
                    pipeline.addLast(businessGroup, new ServerProcessHandler());//businessGroup,
                }
            });

            //綁定端口并阻塞啟動
            ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();

            channelFuture.channel().closeFuture().sync();

        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
            businessGroup.shutdownGracefully();
            eventLoopGroupForTrafficShaping.shutdownGracefully();
        }

    }

}
           

最後

以上介紹了Netty的基本用法,在代碼中也做了一部分的關鍵注釋,但可能還會有許多不足,也不可能滿足所有人的要求,大家可根據自己的實際需求去改造此項目。附上源碼位址netty源碼

持續學習,記錄點滴。更多文章請通路 文章首發