天天看點

筆者開源的asyncframework架構是如何實作類Spring架構@Async注解功能的

asyncframework是筆者于2019年實作的一個類Spring架構@Async注解功能的異步架構,隻需要在接口上添加一個

@AsyncFunction

注解就可讓這個方法異步執行,并已釋出在筆者的Github上:(https://www.github.com/wujiuye)。

asyncframework架構的

@AsyncFunction

注解不僅支援用在無傳回值的方法上,也與Spring架構一樣,它同樣支援

@AsyncFunction

注解用在有傳回值的方法上。

但與Spring架構實作不同的是,asyncframework架構是直接基于動态位元組碼技術實作的,支援在非

spring

項目中使用,這也是我當初寫它的原因。

如果你也對位元組碼感興趣,我非常推薦你閱讀這個架構的源碼,濃縮的都是精華,十幾個類包含了設計模式的使用、位元組碼、以及架構的設計思想,對你了解

spring

@Async

注解實作原理也有幫助。

首先允許筆者向您介紹如何使用asyncframework,再介紹asyncframework的實作原理。

如何使用asyncframework

第一步:在Java項目中添加依賴

<dependency>
  <groupId>com.github.wujiuye</groupId>
  <artifactId>asyncframework</artifactId>
  <version>1.2.0-RELEASE</version>
</dependency>
           

複制

第二步:定義接口以及編寫接口的實作類

/**
 * @author wujiuye
 * @version 1.0 on 2019/11/24
 */
public interface AsyncMessageSubscribe {
    /**
     * 異步無傳回值
     *
     * @param queue
     */
    @AsyncFunction
    void pullMessage(String queue);
    /**
     * 異步帶傳回值
     *
     * @param s1
     * @param s2
     * @return
     */
    @AsyncFunction
    AsyncResult<String> doAction(String s1, String s2);
}
           

複制

編寫實作類:

public class AsyncMessageSubscribe implements AsyncMessageSubscribe {
    @Override
    public void pullMessage(String queue) {
        System.out.println(queue + ", current thread name:" + Thread.currentThread().getName());
    }
    @Override
    public AsyncResult<String> doAction(String s1, String s2) {
        return new AsyncResult<>("hello wujiuye! current thread name:" + Thread.currentThread().getName());
    }
}
           

複制

第三步:配置全局線程池,以及使用

AsyncProxyFactory

建立代理對象

在調用

AsyncProxyFactory

getInterfaceImplSupporAsync

方法建立代理類執行個體時,需要指定異步執行使用哪個線程池,以及接口的實作類,代碼如下。

public class AsmProxyTest {
   // 配置全局線程池
   static ExecutorService executorService = Executors.newFixedThreadPool(2);
   @Test
   public void testAutoProxyAsync() throws Exception {
         AsyncMessageSubscribe proxy = AsmProxyFactory.getInterfaceImplSupporAsync(
                        AsyncMessageSubscribe.class, impl, executorService);
         // 異步不帶傳回值
         proxy.pullMessage("wujiuye");
         // 異步帶傳回值
         AsyncResult<String> asyncResult = proxy.doAction("sssss", "ddd");
         System.out.println(asyncResult.get());
   }
}
           

複制

你可能會問,這還要建立代理類去調用,我直接

new

一個

Runnable

放到線程池執行不是更友善?

确實如此,但如果通過包掃描自動建立代理對象那就不一樣了,

spring

就是通過

BeanPostProcess

實作的自動配置。而且,當我們需要把異步改為同步時,隻需要去掉注解,而當想同步改異步時,也隻需要添加注解,不需要改代碼。

異步無傳回值的實作原理

我們以實作消息異步訂閱為例,介紹在不使用任何架構的情況下,如何通過靜态代理實作将訂閱消息方法由同步切換到異步,而這正是asyncframework的實作原理,asyncframework隻是将靜态代理改為動态代理。

定義消息訂閱接口:

public interface MessageSubscribeTemplate {
      <T> void subscribeMessage(MessageQueueEnum messageQueue, 
                                OnReceiveMessageCallback<T> onReceiveMessageCallback,
                                Class<T> tagClass);
}
           

複制

消息訂閱接口實作類:

public class AwsSqsMessageConsumer implements MessageSubscribeTemplate {    
    @Override
    public <T> void subscribeMessage(MessageQueueEnum messageQueue, 
                                   OnReceiveMessageCallback<T> onReceiveMessageCallback,
                                   Class<T> tagClass){
         // 編寫實作邏輯 
    }
}
           

複制

提示:為什麼消息訂閱抽象為接口?因為當時我們經常會切換MQ架構,一開始使用RocketMQ,後面由于成本問題又切換到了AWS的SQS服務。

下面就可以通過靜态代理實作消息訂閱的同步切異步,代碼如下。

public class MessageSubscribeTemplateProxy implements MessageSubscribeTemplate {
    private ExecutorService executorService; 
    private MessageSubscribeTemplate target;
  
    public MessageSubscribeTemplateProxy(ExecutorService executorService,
                                         MessageSubscribeTemplate target) {
        this.target = target;
      	this.executorService = executorService;
    }
  
    @Override
    public void subscribeMessage(MessageQueueEnum var1, OnReceiveMessageCallback var2, Class var3) {
        // 實作異步調用邏輯,就是放到線程池中去執行
         executorService.execute(()->this.target.subscribeMessage(var1, var2, var3)); 
    }
}
           

複制

asyncframework架構就是實作動态編寫MessageSubscribeTemplateProxy代理類,以此省去同步切異步或異步切同步時修改MessageSubscribeTemplateProxy代理類的麻煩。

有了asyncframework,我們隻需要編寫消息訂閱模版的實作類即可,同步還是異步我們不必關心,當想讓訂閱方法異步執行就在方法上添加@AsyncSubscribe注解。并且支援接口多個方法,對某些方法添加注解,就隻會是這些方法實作異步執行。

異步帶傳回值的實作原理

筆者在實作支援帶傳回值的方法異步執行這個功能時,遇到了兩個大難題:

  • 難點一:帶傳回值的方法如何去實作異步?
  • 難點二:如何編寫位元組碼實作泛型接口的代理類?

spring

項目中,如果想在帶傳回值的方法上添加

@Async

注解,就需要方法傳回值類型為

AsyncResult<T>

,筆者也去看了一下

spring

的源碼,發現

AsyncResult

是一個

Future

思路有是有了,但僅僅隻是依靠

Future

還是實作不了的。

我們知道,

ExecutorService

submit

方法支援送出一個

Callable

帶傳回值的任務,并且

submit

方法傳回一個

Future

,調用這個

Future

get

方法目前線程會阻塞,直到任務執行結束。

是以如果我們在代理類方法中調用

Future

的get方法等待結果,再将結果包裝成

AsyncResult

傳回,這就不是異步執行了,而是同步執行了。

是以我們要解決的問題就是:代理類必須要在将異步方法送出到線程池後,就要立即傳回一個

AsyncResult

,并且要確定當外部調用這個

AsyncResult

get

方法時,擷取到的結果就是最終方法執行後傳回的結果。

筆者在asyncframework架構中是這樣實作的:在代理類将異步方法送出到線程池後,立即傳回一個

AsyncResult

代理對象,這個

AsyncResult

代理對象代理的是

Future

的get方法,當外部調用這個

AsyncResult

代理對象的get方法時,再去調用

Future

的get方法。

先實作

AsyncResult

,這是一個非阻塞的

Future

,因為不需要阻塞。

public class AsyncResult<T> implements Future<T> {
    private T result;
    public AsyncResult(T result) {
        this.result = result;
    }    
  
    @Override
    public T get() throws InterruptedException, ExecutionException {
        return result;
    }

    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return get();
    }

    /**
     * 由位元組碼調用
     *
     * @param future 送出到線程池執行傳回的future
     * @param <T>
     * @return
     */
    public static <T> AsyncResult<T> newAsyncResultProxy(final Future<AsyncResult<T>> future) {
        return new AsyncResult<T>(null) {
            @Override
            public T get() throws InterruptedException, ExecutionException {
                AsyncResult<T> asyncResult = future.get();
                return asyncResult.get();
            }
        };
    }

}
           

複制

newAsyncResultProxy

方法才是整個異步實作的最關鍵一步,該方法是給位元組碼生成的代理對象調用的,代理方法實際傳回結果是newAsyncResultProxy方法傳回的AsyncResult。當外部調用這個

AsyncResult

get

方法時,實際上是去調用

ExecutorService

submit

方法傳回的那個

Future

get

方法。對使用者屏蔽了這個阻塞擷取結果的實作過程。

還是以消息訂閱為例:

// 接口
public interface AsyncMessageSubscribe {
    @AsyncFunction
    AsyncResult<String> doAction(String s1, String s2);
}

// 接口實作類
public AsyncMessageSubscribe impl = new AsyncMessageSubscribe() {
    @Override
    public AsyncResult<String> doAction(String s1, String s2) {
        return new AsyncResult<>("current thread name:" + Thread.currentThread().getName());
    }
};
           

複制

asyncframework架構會使用動态位元組碼技術為我們自動生成将AsyncMessageSubscribe#doAction方法送出到線程池執行的Callable,代碼如下。

public static class AsyncMessageSubscribe_doActionCallable implements Callable<AsyncResult<String>> {
    private AsyncMessageSubscribe target;
    private String param1;
    private String param2;

    public AsyncMessageSubscribe_doActionCallable(AsyncMessageSubscribe var1, String var2, String var3) {
        this.target = var1;
        this.param1 = var2;
        this.param2 = var3;
    }

    public AsyncResult<String> call() throws Exception {
        return this.target.doAction(this.param1, this.param2);
    }
}
           

複制

asyncframework架構使用動态位元組碼技術生成的AsyncMessageSubscribe的動态代理類如下。

public class AsyncMessageSubscribeProxy implements AsyncMessageSubscribe {
    private ExecutorService executorService; 
    private AsyncMessageSubscribe target;
  
    public MessageSubscribeTemplateProxy(ExecutorService executorService,
                                         MessageSubscribeTemplate target) {
        this.executorService = executorService;
        this.target = target;
    }
    
    public AsyncResult<String> doAction(String s1, String s2) {
         AsyncMessageSubscribe_doActionCallable callable = new AsyncMessageSubscribe_doActionCallable(target, "wujiuye", "hello");
         Future result = executorService.submit(callable);
         AsyncResult<String> asyncResult = AsyncResult.newAsyncResultProxy(result);
         return asyncResult;
    }
}
           

複制

在實作asyncframework中踩的動态位元組碼實作泛型接口的坑

asyncframework架構動态實作代理類異步方法的代碼源碼在

FutureFunctionHandler

這個類中。

public class FutureFunctionHandler implements AsyncFunctionHandler{
        /**
             * asyncMethod有傳回值,且傳回值類型為Future的處理
             *
             * @param classWriter          類改寫器
             * @param interfaceClass       接口
             * @param asyncMethod          異步方法
             * @param proxyObjClass        接口的實作類
             * @param executorServiceClass 線程池的類型
             */
        @Override
        public void doOverrideAsyncFunc(ClassWriter classWriter, Class<?> interfaceClass, Method asyncMethod, Class<?> proxyObjClass, Class<? extends ExecutorService> executorServiceClass) {
              ...........
            // invoke submit callable
            methodVisitor.visitVarInsn(ALOAD, 0);
            methodVisitor.visitFieldInsn(GETFIELD, ByteCodeUtils.getProxyClassName(proxyObjClass), "executorService", Type.getDescriptor(executorServiceClass));
            methodVisitor.visitVarInsn(ALOAD, index);
            if (!executorServiceClass.isInterface()) {
                methodVisitor.visitMethodInsn(INVOKEVIRTUAL, executorServiceClass.getName().replace(".", "/"),
                        "submit", ByteCodeUtils.getFuncDesc(Future.class, Callable.class), false);
            } else {
                methodVisitor.visitMethodInsn(INVOKEINTERFACE, executorServiceClass.getName().replace(".", "/"),
                        "submit", ByteCodeUtils.getFuncDesc(Future.class, Callable.class), true);
            }
            // 将傳回值存到操作數棧
            methodVisitor.visitVarInsn(ASTORE, ++index);

            // 再來一層代理,對外部屏蔽線程阻塞等待
            methodVisitor.visitVarInsn(ALOAD, index);
            methodVisitor.visitMethodInsn(INVOKESTATIC, AsyncResult.class.getName().replace(".", "/"),
                    "newAsyncResultProxy", ByteCodeUtils.getFuncDesc(AsyncResult.class, Future.class),
                    false);

            methodVisitor.visitInsn(ARETURN);
            ..............
        }
}
           

複制

線程池在調用

AsyncMessageSubscribe_doActionCallable

這個

Callable

的時候,它查找的call方法的方法描述符是()Ljava.lang.Object;。因為

Callable

是個泛型接口。

如果把實作類的簽名和實作的

call

方法的簽名改為下面這樣反而不行。

類的簽名:Ljava/lang/Object;Ljava/util/concurrent/Callable<Lcom/wujiuye/asyncframework/handler/async/AsyncResult<Ljava/lang/String;>;>;"

call方法的簽名:
()Lcom/wujiuye/asyncframework/handler/async/AsyncResult<Ljava/lang/String;>;
           

複制

因為泛型

<T>

編譯後的描述符是

Ljava.lang.Object;

AsyncResult

泛型類。(選部分)

public class AsyncResult<T> implements Future<T> {

    private T result;
    
    @Override
    public T get() throws InterruptedException, ExecutionException {
        return result;
    }

}
           

複制

AsyncResult

泛型類編譯後的位元組碼資訊。(選部分)

public class com.wujiuye.asyncframework.handler.async.AsyncResult<T> implements java.util.concurrent.Future<T> {
  private T result;
    descriptor: Ljava/lang/Object;

  public T get() throws java.lang.InterruptedException, java.util.concurrent.ExecutionException;
    descriptor: ()Ljava/lang/Object;
    Code:
       0: aload_0
       1: getfield      #2   // Field result:Ljava/lang/Object;
       4: areturn
           

複制

類型

T

descriptor(類型描述符)

Ljava/lang/Object;

,以及

get

方法中,

getfield

指令指定的類型描述符也是

Ljava/lang/Object;

Callable

接口也是泛型接口,編譯後

call

方法的方法描述符便是

()Ljava.lang.Object;

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}
           

複制

是以,如果通過位元組碼實作

Callable

接口,

call

方法不要設定方法簽名,設定方法簽名意味着也要改變方法的描述符,一改變就會導緻線程池中調用這個

Callable

call

方法抛出抽象方法調用錯誤,原因是根據

Callable

接口的

call

方法的描述符在這個

Callable

對象的類(Class)中找不到對應的

call

方法。

A:既然

spring

都已經提供這樣的功能,你為什麼還要實作一個這樣的架構呢?

Q:因為我之前寫元件的時候有需要用到,但又不想為了使用這個功能就把spring依賴到項目中,會比較臃腫。其次,也是因為喜歡折騰,想要把自己的想法實作。

asyncframework

可以取代

spring

@Async

使用,隻要封裝一個

starter

包,依靠

spring

提供的

BeanPostProcess

實作無縫整合。但

spring

都已經提供了,我就不想去造輪子了,

asyncframework

我推薦是在非

spring

項目中使用。