此文已由作者趙計剛授權網易雲社群釋出。
歡迎通路網易雲社群,了解更多網易技術産品營運經驗。
一、使用方式
服務提供方不變,調用方代碼如下:
1 <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
2 <dubbo:method name="sayHello" async="true" timeout="60000"/>
3 <dubbo:method name="sayBye" async="true" timeout="60000"/>
4 </dubbo:reference>
配置裡添加<dubbo:method name="xxx" async="true"/>,表示單個方法xxx使用異步方式;如果demoService下的所有方法都使用異步,直接配置為<dubbo:reference async="true"/>。
1 public static void main(String[] args) throws Exception {
2 //Prevent to get IPV6 address,this way only work in debug mode
3 //But you can pass use -Djava.net.preferIPv4Stack=true,then it work well whether in debug mode or not
4 System.setProperty("java.net.preferIPv4Stack", "true");
5
6 asyncFuture2();
7 }
8
9 public static void asyncFuture1() throws ExecutionException, InterruptedException {
10 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
11 context.start();
12 DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
13
14 long start = System.currentTimeMillis();
15
16 demoService.sayHello("zhangsan");
17 Future<String> helloFuture = RpcContext.getContext().getFuture();
18
19 demoService.sayBye("lisi");
20 Future<String> byeFuture = RpcContext.getContext().getFuture();
21
22 final String helloStr = helloFuture.get();//消耗5s
23 final String byeStr = byeFuture.get();//消耗8s
24
25 System.out.println(helloStr + " -- " + byeStr + " ,cost:" + (System.currentTimeMillis()-start));//總消耗8s
26 }
27
28 public static void asyncFuture2() throws ExecutionException, InterruptedException {
29 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
30 context.start();
31 DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
32
33 long start = System.currentTimeMillis();
34
35 Future<String> helloFuture = RpcContext.getContext().asyncCall(()-> demoService.sayHello("zhangsan"));
36 Future<String> byeFuture = RpcContext.getContext().asyncCall(()->demoService.sayBye("lisi"));
37
38 final String helloStr = helloFuture.get();//消耗5s
39 final String byeStr = byeFuture.get();//消耗8s
40
41 System.out.println(helloStr + " -- " + byeStr + " ,cost:" + (System.currentTimeMillis()-start));//總消耗8s
42 }
Consumer啟動主類。其中asyncFuture2()方法是推薦用法,注意Callable(asyncCall方法的入參)隻是一個任務task,不會建立線程;是以asyncFuture2()和asyncFuture1()相似,資源占用相同,都是用一根線程進行異步操作的。
二、asyncFuture1()源碼解析
先來看asyncFuture1(),總體步驟:
- demoService.sayHello("zhangsan"); 建立一個Future對象,存入目前線程的上下文中
- Future<String> helloFuture = RpcContext.getContext().getFuture(); 從目前線程的上下文中擷取第一步存入的Future對象
- final String helloStr = helloFuture.get(); 阻塞等待,從Future中擷取結果
代碼主要執行流(代碼詳細執行流看文章開頭的三篇部落格):
1、demoService.sayHello("zhangsan");
-->FutureFilter.invoke(final Invoker<?> invoker, final Invocation invocation)
-->DubboInvoker.doInvoke(final Invocation invocation)
FutureFilter:
1 public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
2 final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
3
4 fireInvokeCallback(invoker, invocation);
5 // need to configure if there's return value before the invocation in order to help invoker to judge if it's
6 // necessary to return future.
7 Result result = invoker.invoke(invocation);
8 if (isAsync) {
9 asyncCallback(invoker, invocation);
10 } else {
11 syncCallback(invoker, invocation, result);
12 }
13 return result;
14 }
對于如上異步操作(asyncFuture1()和asyncFuture2()),FutureFilter沒起任何作用,該Filter主要會用在事件通知中,後續再說。
DubboInvoker.doInvoke(final Invocation invocation):
1 protected Result doInvoke(final Invocation invocation) throws Throwable {
2 RpcInvocation inv = (RpcInvocation) invocation; 3 final String methodName = RpcUtils.getMethodName(invocation);
4 inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
5 inv.setAttachment(Constants.VERSION_KEY, version);
6
7 ExchangeClient currentClient;
8 if (clients.length == 1) {
9 currentClient = clients[0];
10 } else {
11 currentClient = clients[index.getAndIncrement() % clients.length];
12 }
13 try {
14 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
15 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
16 int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
17 if (isOneway) { //無傳回值
18 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
19 currentClient.send(inv, isSent);
20 RpcContext.getContext().setFuture(null);
21 return new RpcResult();
22 } else if (isAsync) { //異步有傳回值
23 ResponseFuture future = currentClient.request(inv, timeout);
24 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
25 return new RpcResult();
26 } else { //同步有傳回值
27 RpcContext.getContext().setFuture(null);
28 return (Result) currentClient.request(inv, timeout).get();
29 }
30 } catch (TimeoutException e) {
31 throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
32 } catch (RemotingException e) {
33 throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
34 }
35 }
模式:
- 如果是isOneway(不需要傳回值),不管同步還是異步,請求直接發出,不會建立Future,直接傳回RpcResult空對象。
- 如果是isAsync(異步),則
- 先建立ResponseFuture對象,之後使用FutureAdapter包裝該ResponseFuture對象;(建立ResponseFuture對象與同步的代碼相同,最後得到的是一個DefaultFuture對象)
- 然後将該FutureAdapter對象設入目前線程的上下文中RpcContext.getContext();
- 最後傳回空的RpcResult
- 如果是同步,則先建立ResponseFuture對象,之後直接調用其get()方法進行阻塞調用(見文章開頭的三篇文章)
簡單來看一下FutureAdapter:
1 public class FutureAdapter<V> implements Future<V> {
2
3 private final ResponseFuture future;
4
5 public FutureAdapter(ResponseFuture future) {
6 this.future = future;
7 }
8
9 public ResponseFuture getFuture() {
10 return future;
11 }
12
13 public boolean cancel(boolean mayInterruptIfRunning) {
14 return false;
15 }
16
17 public boolean isCancelled() {
18 return false;
19 }
20
21 public boolean isDone() {
22 return future.isDone();
23 }
24
25 @SuppressWarnings("unchecked")
26 public V get() throws InterruptedException, ExecutionException {
27 try {
28 return (V) (((Result) future.get()).recreate());
29 } catch (RemotingException e) {
30 throw new ExecutionException(e.getMessage(), e);
31 } catch (Throwable e) {
32 throw new RpcException(e);
33 }
34 }
35
36 @SuppressWarnings("unchecked")
37 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
38 int timeoutInMillis = (int) unit.convert(timeout, TimeUnit.MILLISECONDS);
39 try {
40 return (V) (((Result) future.get(timeoutInMillis)).recreate());
41 } catch (com.alibaba.dubbo.remoting.TimeoutException e) {
42 throw new TimeoutException(StringUtils.toString(e));
43 } catch (RemotingException e) {
44 throw new ExecutionException(e.getMessage(), e);
45 } catch (Throwable e) {
46 throw new RpcException(e);
47 }
48 }
49 }
最後,回頭看一下FutureFilter:
1 public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
2 final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
3
4 fireInvokeCallback(invoker, invocation);
5 // need to configure if there's return value before the invocation in order to help invoker to judge if it's
6 // necessary to return future.
7 Result result = invoker.invoke(invocation);
8 if (isAsync) {
9 asyncCallback(invoker, invocation);
10 } else {
11 syncCallback(invoker, invocation, result);
12 }
13 return result;
14 }
免費體驗雲安全(易盾)内容安全、驗證碼等服務
更多網易技術、産品、營運經驗分享請點選。
相關文章:
【推薦】 使用QUIC
【推薦】 資料庫路由中間件MyCat - 源代碼篇(5)