一、前言
前面我們探讨了如何擷取某一個Dubbo的服務的提供者清單,本節我們探讨如何使用Dubbo的擴充,實作指定IP調用。
二、實作
在Dubbo中叢集容錯政策Cluster是SPI擴充接口,DUbbo架構提供了豐富的叢集容錯政策實作,本節我們就基于擴充接口實作指定IP調用功能。
首先我們實作擴充接口Cluster:
public class MyCluster implements Cluster{
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MyClusterInvoker(directory);
}
}
然後我們看自己實作的MyClusterInvoker
public class MyClusterInvoker<T> extends MyAbstractClusterInvoker<T> {
public MyClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance)
throws RpcException {
//1.檢視是否設定了指定ip
String ip = (String) RpcContext.getContext().get("ip");
if (StringUtils.isBlank(ip)) {
throw new RuntimeException("ip is blank ");
}
//2.檢查是否有可用invoker
checkInvokers(invokers,invocation);
//3.根據指定ip擷取對應invoker
Invoker<T> invoked = invokers.stream().filter(invoker -> invoker.getUrl().getHost().equals(ip))
.findFirst().orElse(null);
//4.檢查是否有可用invoker
if(null == invoked) {
throw new RpcException(RpcException.NO_INVOKER_AVAILABLE_AFTER_FILTER,
"Failed to invoke the method " + invocation.getMethodName() + " in the service "
+ getInterface().getName() + ". No provider available for the service "
+ directory.getUrl().getServiceKey() + " from ip " + ip + " on the consumer "
+ NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion()
+ ". Please check if the providers have been started and registered.");
}
//5.發起遠端調用,失敗則抛出異常
try {
return invoked.invoke(invocation);
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
throw (RpcException) e;
}
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
"Fail invoke providers " + (invoked != null?invoked.getUrl():"")+ " " + loadbalance.getClass().getSimpleName()
+ " select from all providers " + invokers + " for service " + getInterface().getName()
+ " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
e.getCause() != null ? e.getCause() : e);
}
}
...
}
- 如上代碼1,我們從RpcContext.getContext()擷取了屬性值ip,如果指定了改值說明指定了ip,
- 代碼2則檢查是否有可用的服務提供者,如果沒有則抛出異常。
- 代碼3變量invokers清單查找指定IP對應的Invoker
- 代碼4 檢查是否有對應IP對應的Invoker,沒有則抛出異常。
- 代碼5 具體使用選擇的invoker發起遠端調用。
注意我們還修改了架構的AbstractClusterInvoker為MyAbstractClusterInvoker:
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
List<Invoker<T>> invokers = list(invocation);
LoadBalance loadbalance = null;//initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
這裡我們把 LoadBalance loadbalance = initLoadBalance(invokers, invocation); 修改為了 LoadBalance loadbalance = null;因為我們不需要負載均衡了。
擴充實作寫好後,要把擴充實作配置到下面檔案
然後在消費端調用具體服務前進行下面設定就可以指定ip調用了。
//設定叢集容錯政策為我們自己的
referenceConfig.setCluster("myCluster");
//指定ip,企圖讓ip為30.10.67.231的服務提供者來處理服務
RpcContext.getContext().set("ip", "30.10.67.231");
三、總結
Dubbo是一個高度可擴充的架構,基于SPI的擴充接口,我們可以根據需要定制我們自己的實作,本文我們則基于叢集容錯政策實作了基于ip調用的擴充。