一、前言
dubbo架構本身提供了豐富的負載均衡政策,比如輪詢、随機、最少活躍調用數、一緻性hash等,但是有時候我們需要自己根據業務指定某個ip來進行調用。要指定ip進行調用就需要先知道服務提供者的ip。本文我們先來探讨第一步,當服務注冊中心使用zookeeper時候如何擷取某一個服務的提供端的位址清單。
二、實作
我們知道當服務提供方啟動時候,會注冊服務到服務注冊中心,本文我們通用zookeeper,比如服務com.books.dubbo.demo.api.GreetingService則注冊到zk後,是下面樹形結構
那麼當消費端啟動時候會去zookeeper上訂閱path為/dubbo/com.books.dubbo.demo.api.GreetingService/providers下面的資訊,也就是服務提供者清單資訊,那麼我們就可以基于這個原理來擷取某一個服務提供者清單,然後對資訊進行過濾加工,并且注冊一個監聽器,當服務提供者機器增減後,動态更新儲存的位址清單。
基于上面原理實作代碼如下:
public class ZookeeperIpList {
private String dataId = "com.books.dubbo.demo.api.GreetingService/providers:1.0.0";
private URL CONSUMER_URL;
private static final Joiner j = Joiner.on("|").useForNull("nil");
public final List<String> getIpList() {
return ipList;
}
private volatile List<String> ipList = new ArrayList<String>();
//對擷取的清單内容進行過濾
private static List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
List<URL> urls = new ArrayList<URL>();
if (providers != null && providers.size() > 0) {
urls = providers.stream().map(provider -> URL.decode(provider)).filter(provider -> provider.contains("://"))
.map(provider -> URL.valueOf(provider)).filter(url -> UrlUtils.isMatch(consumer, url))
.collect(Collectors.toList());
}
return urls;
}
// 解析服務提供者位址清單為ip:port格式
private void parseIpList(List<String> ipSet) {
List<URL> urlList = toUrlsWithoutEmpty(CONSUMER_URL, ipSet);
final List<String> ipListTemp = urlList.stream().map(url -> url.getAddress()).collect(Collectors.toList());
this.ipList = ipListTemp;
}
public void init(String zkServerAddr, String zkGroup, String dataId, String serviceGroup) {
// 1.參數校驗
Assert.notNull(zkServerAddr, "zkServerAddr is null.");
Assert.notNull(dataId, "dataId is null.");
Assert.notNull(dataId, "zkGroup is null.");
Assert.notNull(dataId, "serviceGroup is null.");
// 2.拼接訂閱的path
String[] temp = dataId.split(":");
if (temp.length != 2) {
throw new RuntimeException("dataId is illegal");
}
this.dataId = "/" + zkGroup + "/" + temp[0] + "/providers";
String consumeUrl = "consumer://127.0.0.1/?group=" + serviceGroup + "&interface=" + temp[0] + "&version="
+ temp[1];
CONSUMER_URL = URL.valueOf(consumeUrl);
// 3.開啟zk,訂閱path路徑下服務提供者資訊,并添加監聽器
System.out.println(j.join("init zk ", zkServerAddr, this.dataId, consumeUrl));
ZkClient zkClient = new ZkClient(zkServerAddr);
List<String> list = zkClient.subscribeChildChanges(this.dataId, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
// 3.1解析服務提供者位址清單
parseIpList(currentChilds);
try {
System.out.println((j.join("ipList changed:", JSON.json(ipList))));
} catch (IOException e) {
}
}
});
//4. 解析服務提供者ip清單
parseIpList(list);
}
public static void main(String[] a) throws InterruptedException {
ZookeeperIpList zk = new ZookeeperIpList();
zk.init("127.0.0.1:2181", "dubbo", "com.books.dubbo.demo.api.GreetingService:1.0.0", "dubbo");
try {
System.out.println((j.join("parseIpList", JSON.json(zk.getIpList()))));
} catch (IOException e) {
}
Thread.currentThread().join();
}
}
如上代碼main函數建立了一個ZookeeperIpList對象,并且調用其init方法,參數分别為zk位址,zk分組,服務接口以及版本,服務分組。
init方法内首先拼接要訂閱的zk的path,拼接完成後dataid為/dubbo/com.books.dubbo.demo.api.GreetingService/providers,然後建立zkclient訂閱該dataid對應的path,并且注冊監聽器,當path下資訊變化後會得到最新清單。
并且使用parseIpList方法解析擷取的位址清單為ip:port個數,解析完畢後儲存到ipList中。
三、總結
本節介紹了一個簡單的基于zookeeper擷取服務提供者位址清單的方法,後面我們看如何指定ip進行調用。