天天看點

RPC項目更新筆記(四):設計一個ZooKeeper注冊中心 ,負載均衡

作者:Java解白
RPC項目更新筆記(四):設計一個ZooKeeper注冊中心 ,負載均衡

前提摘要

  1. 服務端與用戶端通信的host與port是預先就确定好的,每一個用戶端都必須知道對應服務的ip與端口号, 并且如果服務挂了或者換位址了,就很麻煩。擴充性也不強, 是以我們需要引入注冊中心
  2. 在RPC架構中,一般一個服務會有多個提供者支援,如何分散服務提供者的壓力?引入負載均衡 RPC三大要素用戶端和服務端我們都已經有了,現在就隻剩下注冊中心這最後一塊拼圖了!Let's go!
  • Zookeeper

注冊中心的位址是固定的(為了高可用一般是叢集,我們看做黑盒即可),服務端上線時,在注冊中心注冊自己的服務與對應的位址,而用戶端調用服務時,去注冊中心根據服務名找到對應的服務端位址。

  • Curator

Curator是一套Zookeeper用戶端架構,解決了很多 Zookeeper 用戶端非常底層的細節開發工作,這次項目的更新便基于此。

Zookeeper注冊中心

首先在本機安裝Zookeeper,預設端口為2181。

定義服務注冊接口

// 服務注冊接口,兩大基本功能,注冊:儲存服務與位址。 查詢:根據服務名查找位址
public interface ServiceRegister {
    void register(String serviceName, InetSocketAddress serverAddress);
    InetSocketAddress serviceDiscovery(String serviceName);
}
複制代碼           

服務注冊接口的實作類為

public class ZkServiceRegister implements ServiceRegister {
    // curator 提供的zookeeper用戶端
    private CuratorFramework client;
    // zookeeper根路徑節點
    private static final String ROOT_PATH = "MyRPC";

    // 這裡負責zookeeper用戶端的初始化,并與zookeeper服務端建立連接配接
    public ZkServiceRegister(){
        // 指數時間重試
        RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
        // zookeeper的位址固定,不管是服務提供者還是,消費者都要與之建立連接配接
        // sessionTimeoutMs 與 zoo.cfg中的tickTime 有關系,
        // zk還會根據minSessionTimeout與maxSessionTimeout兩個參數重新調整最後的逾時值。預設分别為tickTime 的2倍和20倍
        // 使用心跳監聽狀态
        this.client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                .sessionTimeoutMs(40000).retryPolicy(policy).namespace(ROOT_PATH).build();
        this.client.start();
        System.out.println("zookeeper 連接配接成功");
    }

    @Override
    public void register(String serviceName, InetSocketAddress serverAddress){
        try {
            // serviceName建立成永久節點,服務提供者下線時,不删服務名,隻删位址
            if(client.checkExists().forPath("/" + serviceName) == null){
                client.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT)
                        .forPath("/" + serviceName);
            }
            // 路徑位址,一個/代表一個節點
            String path = "/" + serviceName +"/"+ getServiceAddress(serverAddress);
            // 臨時節點,伺服器下線就删除節點
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
        } catch (Exception e) {
            System.out.println("此服務已存在");
        }
    }
    // 根據服務名傳回位址
    @Override
    public InetSocketAddress serviceDiscovery(String serviceName) {
        try {
            List<String> strings = client.getChildren().forPath("/" + serviceName);
            // 這裡預設用的第一個,後面加負載均衡
            String string = strings.get(0);
            return parseAddress(string);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    // 位址 -> XXX.XXX.XXX.XXX:port 字元串
    private String getServiceAddress(InetSocketAddress serverAddress) {
        return serverAddress.getHostName() +
                ":" +
                serverAddress.getPort();
    }
    // 字元串解析為位址
    private InetSocketAddress parseAddress(String address) {
        String[] result = address.split(":");
        return new InetSocketAddress(result[0], Integer.parseInt(result[1]));
    }
}
複制代碼           

用戶端更新

執行個體用戶端的時候不再需要傳入host,post的值了,執行個體在發送請求即sendRequest的過程中會自動通過ZkServiceRegister擷取到zookeeper注冊中心擷取到目前服務需要發送到位址(服務提供端的所在地。

\\不再需要傳入host,post
RPCClient rpcClient = new NettyRPCClient();
複制代碼           
\\擷取注冊中心存儲的資訊
InetSocketAddress address = serviceRegister.serviceDiscovery(request.getInterfaceName());
host = address.getHostName();
port = address.getPort();
複制代碼           

服務端更新

用戶端不需要再寫入host,post了,相應的服務端就需要給注冊中心傳它能提供的服務以及它的host,port。

ServiceProvider serviceProvider = new ServiceProvider("127.0.0.1", 8899);
複制代碼           

在服務提供類加入注冊的功能

public class ServiceProvider {

    private Map<String, Object> interfaceProvider;
    private String host;
    private int port;
    private ServiceRegister serviceRegister;

    public ServiceProvider(String host, int port){
        this.host = host;
        this.port = port;
        this.serviceRegister = new ZkServiceRegister();
        this.interfaceProvider = new HashMap<>();
    }

    public void provideServiceInterface(Object service){
        Class<?>[] interfaces = service.getClass().getInterfaces();

        for(Class clazz : interfaces){
            interfaceProvider.put(clazz.getName(),service);
            // 在注冊中心注冊服務
            serviceRegister.register(clazz.getName(),new InetSocketAddress(host,port));
        }

    }

    public Object getService(String interfaceName){
        return interfaceProvider.get(interfaceName);
    }
}
複制代碼           

負載均衡

public InetSocketAddress serviceDiscovery(String serviceName) {
    try {
        List<String> strings = client.getChildren().forPath("/" + serviceName);
        // 這裡預設用的第一個,後面加負載均衡
        String string = strings.get(0);
        return parseAddress(string);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return null;
}
複制代碼           

上面的代碼中其實還存在一個小問題,每次我們用戶端問注冊中心要服務對應的位址的時候,永遠是給第一個服務者的位址,這顯然是不合理的,我們需要分散服務提供者的壓力。

  • 負載均衡接口
// 給伺服器位址清單,根據不同的負載均衡政策選擇一個
public interface LoadBalance {
    String balance(List<String> addressList);
}

/**
 * 随機負載均衡
 */
public class RandomLoadBalance implements  LoadBalance{
    @Override
    public String balance(List<String> addressList) {

        Random random = new Random();
        int choose = random.nextInt(addressList.size());
        System.out.println("負載均衡選擇了" + choose + "伺服器");
        return addressList.get(choose);
    }
}

/**
 * 輪詢負載均衡
 */
public class RoundLoadBalance implements LoadBalance{
    private int choose = -1;
    @Override
    public String balance(List<String> addressList) {
        choose++;
        choose = choose%addressList.size();
        return addressList.get(choose);
    }
}           

繼續閱讀