asyncframework是筆者于2019年實作的一個類Spring架構@Async注解功能的異步架構,隻需要在接口上添加一個
@AsyncFunction
注解就可讓這個方法異步執行,并已釋出在筆者的Github上:(https://www.github.com/wujiuye)。
asyncframework架構的
@AsyncFunction
注解不僅支援用在無傳回值的方法上,也與Spring架構一樣,它同樣支援
@AsyncFunction
注解用在有傳回值的方法上。
但與Spring架構實作不同的是,asyncframework架構是直接基于動态位元組碼技術實作的,支援在非
spring
項目中使用,這也是我當初寫它的原因。
如果你也對位元組碼感興趣,我非常推薦你閱讀這個架構的源碼,濃縮的都是精華,十幾個類包含了設計模式的使用、位元組碼、以及架構的設計思想,對你了解
spring
的
@Async
注解實作原理也有幫助。
首先允許筆者向您介紹如何使用asyncframework,再介紹asyncframework的實作原理。
如何使用asyncframework
第一步:在Java項目中添加依賴
<dependency>
<groupId>com.github.wujiuye</groupId>
<artifactId>asyncframework</artifactId>
<version>1.2.0-RELEASE</version>
</dependency>
複制
第二步:定義接口以及編寫接口的實作類
/**
* @author wujiuye
* @version 1.0 on 2019/11/24
*/
public interface AsyncMessageSubscribe {
/**
* 異步無傳回值
*
* @param queue
*/
@AsyncFunction
void pullMessage(String queue);
/**
* 異步帶傳回值
*
* @param s1
* @param s2
* @return
*/
@AsyncFunction
AsyncResult<String> doAction(String s1, String s2);
}
複制
編寫實作類:
public class AsyncMessageSubscribe implements AsyncMessageSubscribe {
@Override
public void pullMessage(String queue) {
System.out.println(queue + ", current thread name:" + Thread.currentThread().getName());
}
@Override
public AsyncResult<String> doAction(String s1, String s2) {
return new AsyncResult<>("hello wujiuye! current thread name:" + Thread.currentThread().getName());
}
}
複制
第三步:配置全局線程池,以及使用
AsyncProxyFactory
建立代理對象
在調用
AsyncProxyFactory
的
getInterfaceImplSupporAsync
方法建立代理類執行個體時,需要指定異步執行使用哪個線程池,以及接口的實作類,代碼如下。
public class AsmProxyTest {
// 配置全局線程池
static ExecutorService executorService = Executors.newFixedThreadPool(2);
@Test
public void testAutoProxyAsync() throws Exception {
AsyncMessageSubscribe proxy = AsmProxyFactory.getInterfaceImplSupporAsync(
AsyncMessageSubscribe.class, impl, executorService);
// 異步不帶傳回值
proxy.pullMessage("wujiuye");
// 異步帶傳回值
AsyncResult<String> asyncResult = proxy.doAction("sssss", "ddd");
System.out.println(asyncResult.get());
}
}
複制
你可能會問,這還要建立代理類去調用,我直接
new
一個
Runnable
放到線程池執行不是更友善?
确實如此,但如果通過包掃描自動建立代理對象那就不一樣了,
spring
就是通過
BeanPostProcess
實作的自動配置。而且,當我們需要把異步改為同步時,隻需要去掉注解,而當想同步改異步時,也隻需要添加注解,不需要改代碼。
異步無傳回值的實作原理
我們以實作消息異步訂閱為例,介紹在不使用任何架構的情況下,如何通過靜态代理實作将訂閱消息方法由同步切換到異步,而這正是asyncframework的實作原理,asyncframework隻是将靜态代理改為動态代理。
定義消息訂閱接口:
public interface MessageSubscribeTemplate {
<T> void subscribeMessage(MessageQueueEnum messageQueue,
OnReceiveMessageCallback<T> onReceiveMessageCallback,
Class<T> tagClass);
}
複制
消息訂閱接口實作類:
public class AwsSqsMessageConsumer implements MessageSubscribeTemplate {
@Override
public <T> void subscribeMessage(MessageQueueEnum messageQueue,
OnReceiveMessageCallback<T> onReceiveMessageCallback,
Class<T> tagClass){
// 編寫實作邏輯
}
}
複制
提示:為什麼消息訂閱抽象為接口?因為當時我們經常會切換MQ架構,一開始使用RocketMQ,後面由于成本問題又切換到了AWS的SQS服務。
下面就可以通過靜态代理實作消息訂閱的同步切異步,代碼如下。
public class MessageSubscribeTemplateProxy implements MessageSubscribeTemplate {
private ExecutorService executorService;
private MessageSubscribeTemplate target;
public MessageSubscribeTemplateProxy(ExecutorService executorService,
MessageSubscribeTemplate target) {
this.target = target;
this.executorService = executorService;
}
@Override
public void subscribeMessage(MessageQueueEnum var1, OnReceiveMessageCallback var2, Class var3) {
// 實作異步調用邏輯,就是放到線程池中去執行
executorService.execute(()->this.target.subscribeMessage(var1, var2, var3));
}
}
複制
asyncframework架構就是實作動态編寫MessageSubscribeTemplateProxy代理類,以此省去同步切異步或異步切同步時修改MessageSubscribeTemplateProxy代理類的麻煩。
有了asyncframework,我們隻需要編寫消息訂閱模版的實作類即可,同步還是異步我們不必關心,當想讓訂閱方法異步執行就在方法上添加@AsyncSubscribe注解。并且支援接口多個方法,對某些方法添加注解,就隻會是這些方法實作異步執行。
異步帶傳回值的實作原理
筆者在實作支援帶傳回值的方法異步執行這個功能時,遇到了兩個大難題:
- 難點一:帶傳回值的方法如何去實作異步?
- 難點二:如何編寫位元組碼實作泛型接口的代理類?
在
spring
項目中,如果想在帶傳回值的方法上添加
@Async
注解,就需要方法傳回值類型為
AsyncResult<T>
,筆者也去看了一下
spring
的源碼,發現
AsyncResult
是一個
Future
。
思路有是有了,但僅僅隻是依靠
Future
還是實作不了的。
我們知道,
ExecutorService
的
submit
方法支援送出一個
Callable
帶傳回值的任務,并且
submit
方法傳回一個
Future
,調用這個
Future
的
get
方法目前線程會阻塞,直到任務執行結束。
是以如果我們在代理類方法中調用
Future
的get方法等待結果,再将結果包裝成
AsyncResult
傳回,這就不是異步執行了,而是同步執行了。
是以我們要解決的問題就是:代理類必須要在将異步方法送出到線程池後,就要立即傳回一個
AsyncResult
,并且要確定當外部調用這個
AsyncResult
的
get
方法時,擷取到的結果就是最終方法執行後傳回的結果。
筆者在asyncframework架構中是這樣實作的:在代理類将異步方法送出到線程池後,立即傳回一個
AsyncResult
代理對象,這個
AsyncResult
代理對象代理的是
Future
的get方法,當外部調用這個
AsyncResult
代理對象的get方法時,再去調用
Future
的get方法。
先實作
AsyncResult
,這是一個非阻塞的
Future
,因為不需要阻塞。
public class AsyncResult<T> implements Future<T> {
private T result;
public AsyncResult(T result) {
this.result = result;
}
@Override
public T get() throws InterruptedException, ExecutionException {
return result;
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return get();
}
/**
* 由位元組碼調用
*
* @param future 送出到線程池執行傳回的future
* @param <T>
* @return
*/
public static <T> AsyncResult<T> newAsyncResultProxy(final Future<AsyncResult<T>> future) {
return new AsyncResult<T>(null) {
@Override
public T get() throws InterruptedException, ExecutionException {
AsyncResult<T> asyncResult = future.get();
return asyncResult.get();
}
};
}
}
複制
newAsyncResultProxy
方法才是整個異步實作的最關鍵一步,該方法是給位元組碼生成的代理對象調用的,代理方法實際傳回結果是newAsyncResultProxy方法傳回的AsyncResult。當外部調用這個
AsyncResult
的
get
方法時,實際上是去調用
ExecutorService
的
submit
方法傳回的那個
Future
的
get
方法。對使用者屏蔽了這個阻塞擷取結果的實作過程。
還是以消息訂閱為例:
// 接口
public interface AsyncMessageSubscribe {
@AsyncFunction
AsyncResult<String> doAction(String s1, String s2);
}
// 接口實作類
public AsyncMessageSubscribe impl = new AsyncMessageSubscribe() {
@Override
public AsyncResult<String> doAction(String s1, String s2) {
return new AsyncResult<>("current thread name:" + Thread.currentThread().getName());
}
};
複制
asyncframework架構會使用動态位元組碼技術為我們自動生成将AsyncMessageSubscribe#doAction方法送出到線程池執行的Callable,代碼如下。
public static class AsyncMessageSubscribe_doActionCallable implements Callable<AsyncResult<String>> {
private AsyncMessageSubscribe target;
private String param1;
private String param2;
public AsyncMessageSubscribe_doActionCallable(AsyncMessageSubscribe var1, String var2, String var3) {
this.target = var1;
this.param1 = var2;
this.param2 = var3;
}
public AsyncResult<String> call() throws Exception {
return this.target.doAction(this.param1, this.param2);
}
}
複制
asyncframework架構使用動态位元組碼技術生成的AsyncMessageSubscribe的動态代理類如下。
public class AsyncMessageSubscribeProxy implements AsyncMessageSubscribe {
private ExecutorService executorService;
private AsyncMessageSubscribe target;
public MessageSubscribeTemplateProxy(ExecutorService executorService,
MessageSubscribeTemplate target) {
this.executorService = executorService;
this.target = target;
}
public AsyncResult<String> doAction(String s1, String s2) {
AsyncMessageSubscribe_doActionCallable callable = new AsyncMessageSubscribe_doActionCallable(target, "wujiuye", "hello");
Future result = executorService.submit(callable);
AsyncResult<String> asyncResult = AsyncResult.newAsyncResultProxy(result);
return asyncResult;
}
}
複制
在實作asyncframework中踩的動态位元組碼實作泛型接口的坑
asyncframework架構動态實作代理類異步方法的代碼源碼在
FutureFunctionHandler
這個類中。
public class FutureFunctionHandler implements AsyncFunctionHandler{
/**
* asyncMethod有傳回值,且傳回值類型為Future的處理
*
* @param classWriter 類改寫器
* @param interfaceClass 接口
* @param asyncMethod 異步方法
* @param proxyObjClass 接口的實作類
* @param executorServiceClass 線程池的類型
*/
@Override
public void doOverrideAsyncFunc(ClassWriter classWriter, Class<?> interfaceClass, Method asyncMethod, Class<?> proxyObjClass, Class<? extends ExecutorService> executorServiceClass) {
...........
// invoke submit callable
methodVisitor.visitVarInsn(ALOAD, 0);
methodVisitor.visitFieldInsn(GETFIELD, ByteCodeUtils.getProxyClassName(proxyObjClass), "executorService", Type.getDescriptor(executorServiceClass));
methodVisitor.visitVarInsn(ALOAD, index);
if (!executorServiceClass.isInterface()) {
methodVisitor.visitMethodInsn(INVOKEVIRTUAL, executorServiceClass.getName().replace(".", "/"),
"submit", ByteCodeUtils.getFuncDesc(Future.class, Callable.class), false);
} else {
methodVisitor.visitMethodInsn(INVOKEINTERFACE, executorServiceClass.getName().replace(".", "/"),
"submit", ByteCodeUtils.getFuncDesc(Future.class, Callable.class), true);
}
// 将傳回值存到操作數棧
methodVisitor.visitVarInsn(ASTORE, ++index);
// 再來一層代理,對外部屏蔽線程阻塞等待
methodVisitor.visitVarInsn(ALOAD, index);
methodVisitor.visitMethodInsn(INVOKESTATIC, AsyncResult.class.getName().replace(".", "/"),
"newAsyncResultProxy", ByteCodeUtils.getFuncDesc(AsyncResult.class, Future.class),
false);
methodVisitor.visitInsn(ARETURN);
..............
}
}
複制
線程池在調用
AsyncMessageSubscribe_doActionCallable
這個
Callable
的時候,它查找的call方法的方法描述符是()Ljava.lang.Object;。因為
Callable
是個泛型接口。
如果把實作類的簽名和實作的
call
方法的簽名改為下面這樣反而不行。
類的簽名:Ljava/lang/Object;Ljava/util/concurrent/Callable<Lcom/wujiuye/asyncframework/handler/async/AsyncResult<Ljava/lang/String;>;>;"
call方法的簽名:
()Lcom/wujiuye/asyncframework/handler/async/AsyncResult<Ljava/lang/String;>;
複制
因為泛型
<T>
編譯後的描述符是
Ljava.lang.Object;
。
如
AsyncResult
泛型類。(選部分)
public class AsyncResult<T> implements Future<T> {
private T result;
@Override
public T get() throws InterruptedException, ExecutionException {
return result;
}
}
複制
AsyncResult
泛型類編譯後的位元組碼資訊。(選部分)
public class com.wujiuye.asyncframework.handler.async.AsyncResult<T> implements java.util.concurrent.Future<T> {
private T result;
descriptor: Ljava/lang/Object;
public T get() throws java.lang.InterruptedException, java.util.concurrent.ExecutionException;
descriptor: ()Ljava/lang/Object;
Code:
0: aload_0
1: getfield #2 // Field result:Ljava/lang/Object;
4: areturn
複制
類型
T
的
descriptor(類型描述符)
為
Ljava/lang/Object;
,以及
get
方法中,
getfield
指令指定的類型描述符也是
Ljava/lang/Object;
。
Callable
接口也是泛型接口,編譯後
call
方法的方法描述符便是
()Ljava.lang.Object;
。
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
複制
是以,如果通過位元組碼實作
Callable
接口,
call
方法不要設定方法簽名,設定方法簽名意味着也要改變方法的描述符,一改變就會導緻線程池中調用這個
Callable
的
call
方法抛出抽象方法調用錯誤,原因是根據
Callable
接口的
call
方法的描述符在這個
Callable
對象的類(Class)中找不到對應的
call
方法。
A:既然
spring
都已經提供這樣的功能,你為什麼還要實作一個這樣的架構呢?
Q:因為我之前寫元件的時候有需要用到,但又不想為了使用這個功能就把spring依賴到項目中,會比較臃腫。其次,也是因為喜歡折騰,想要把自己的想法實作。
asyncframework
可以取代
spring
的
@Async
使用,隻要封裝一個
starter
包,依靠
spring
提供的
BeanPostProcess
實作無縫整合。但
spring
都已經提供了,我就不想去造輪子了,
asyncframework
我推薦是在非
spring
項目中使用。