天天看點

Netty4(三)多連接配接用戶端設計與實作目标UML類圖實作測試

本文介紹多連接配接的netty用戶端設計
  • 目标
  • UML類圖
  • 實作
  • 測試
  • Netty4(二)服務端和用戶端實作

目标

Netty(二)一文中實作了單連接配接用戶端,也就是說用戶端隻有一個連接配接,這就不利于高并發RPC的設計,本文嘗試設計一個多連接配接的用戶端,支援斷線重連

UML類圖

Netty4(三)多連接配接用戶端設計與實作目标UML類圖實作測試

實作

多連接配接用戶端

package com.mym.netty.client;


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 多連接配接用戶端
 */
public class MutilClient {

    /**服務類*/
    private Bootstrap bootstrap = new Bootstrap();

    /**會話集合*/
    private List<Channel>  channels = new ArrayList<Channel>();

    /**引用計數*/
    private final AtomicInteger index = new AtomicInteger();

    /**初始化*/
    public void init(int count){
        //worker
        EventLoopGroup worker = new NioEventLoopGroup();

        //設定工作線程
        this.bootstrap.group(worker);

        //初始化channel
        bootstrap.channel(NioSocketChannel.class);

        //設定handler管道
        bootstrap.handler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel channel) throws Exception {
                channel.pipeline().addLast(new StringDecoder());
                channel.pipeline().addLast(new StringEncoder());
                channel.pipeline().addLast(new ClientHandler());
            }
        });

        //根據連接配接數建立連接配接
        for(int i = ;i < count;i++){
            ChannelFuture channelFuture = bootstrap.connect("0.0.0.0",);
            channels.add(channelFuture.channel());
        }

    }

    /**擷取channel(會話)*/
    public Channel nextChannel(){
        return getFirstActiveChannel();
    }

    private Channel getFirstActiveChannel(int count) {
        Channel channel = channels.get(Math.abs(index.getAndIncrement() % channels.size()));
        if(!channel.isActive()){
            //重連
            reconect(channel);
            if(count > channels.size()){
                throw new RuntimeException("no Idle channel!");
            }

            return getFirstActiveChannel(count + );
        }
        return channel;
    }

    /**重連*/
    private void reconect(Channel channel) {
        //此處可改為原子操作
        synchronized(channel){
            if(channels.indexOf(channel) == -){
                return ;
            }

            Channel newChannel = bootstrap.connect("0.0.0.0", ).channel();
            channels.set(channels.indexOf(channel), newChannel);

            System.out.println(channels.indexOf(channel) + "位置的channel成功進行重連!");
        }
    }

}
           

本類采用對象組的方式儲存連接配接。因為一個thread + 隊列 == 一個單線程線程池 是線程安全的,任務是線性串行執行的

用戶端handler

package com.mym.netty.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("client receive msg:"+msg.toString());
    }
}
           

測試類

package com.mym.netty.client;

import java.io.BufferedReader;
import java.io.InputStreamReader;

public class StartClient {

    public static void main(String[] args) {
        mutilClient();
    }

    public static void mutilClient(){
        MutilClient client = new MutilClient();
        client.init();

        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
        while(true){
            try {
                System.out.println("請輸入:");
                String msg = bufferedReader.readLine();
                client.nextChannel().writeAndFlush(msg);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}
           

測試

測試步驟:連接配接服務端後,用戶端先向服務端發送消息,用戶端進行斷網,然後開網,然後再想服務端發送消息

用戶端輸出如下:

請輸入:
nihao
client receive msg:this is ServerHandler reply msg happend at !this is ServerHandler2 reply msg happend at !

此處斷網,一大堆錯。然後重新開網,再次發送消息
請輸入:
hello
-位置的channel成功進行重連!
client receive msg:this is ServerHandler reply msg happend at !
client receive msg:this is ServerHandler2 reply msg happend at !
           
本次實作仍有可優化的地方,歡迎留言給出建議

繼續閱讀