天天看點

微服務的異步調用

微服務的異步調用

三豐 soft張三豐

微服務的異步調用

異步調用

一個可以無需等待被調用函數的傳回值就讓操作繼續進行的方法。

異步調用就是你 喊 你朋友吃飯 ,你朋友說知道了 ,待會忙完去找你 ,你就去做别的了。同步調用就是你 喊 你朋友吃飯 ,你朋友在忙 ,你就一直在那等,等你朋友忙完了 ,你們一起去。

jdk1.8之前的Future

jdk并發包裡的Future代表了未來的某個結果,當我們向線程池中送出任務的時候會傳回該對象,可以通過future獲得執行的結果,但是jdk1.8之前的Future有點雞肋,并不能實作真正的異步,需要阻塞的擷取結果,或者不斷的輪詢。

通常我們希望當線程執行完一些耗時的任務後,能夠自動的通知我們結果,很遺憾這在原生jdk1.8之前是不支援的,但是我們可以通過第三方的庫實作真正的異步回調。

public class JavaFuture {
    public static void main(String[] args) throws Throwable, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(1);
        Future<String> f = executor.submit(new Callable<String>() {

            @Override
            public String call() throws Exception {
                System.out.println("task started!");
                longTimeMethod();
                System.out.println("task finished!");
                return "hello";
            }
        });

        //此處get()方法阻塞main線程
        System.out.println(f.get());
        System.out.println("main thread is blocked");
    }
}           

如果想獲得耗時操作的結果,可以通過get()方法擷取,但是該方法會阻塞目前線程,我們可以在做完剩下的某些工作的時候調用get()方法試圖去擷取結果。

也可以調用非阻塞的方法isDone來确定操作是否完成,isDone這種方式有點兒類似下面的過程:

微服務的異步調用

jdk1.8開始的Future

直到jdk1.8才算真正支援了異步操作,jdk1.8中提供了lambda表達式,使得java向函數式語言又靠近了一步。借助jdk原生的CompletableFuture可以實作異步的操作,同時結合lambada表達式大大簡化了代碼量。代碼例子如下:

package netty_promise;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;

public class JavaPromise {
    public static void main(String[] args) throws Throwable, ExecutionException {
        // 兩個線程的線程池
        ExecutorService executor = Executors.newFixedThreadPool(2);
        //jdk1.8之前的實作方式
        CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                System.out.println("task started!");
                try {
                    //模拟耗時操作
                    longTimeMethod();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "task finished!";
            }
        }, executor);

        //采用lambada的實作方式
        future.thenAccept(e -> System.out.println(e + " ok"));

        System.out.println("main thread is running");
    }
}           

實作方式類似下圖:

微服務的異步調用

Spring的異步方法

先把longTimeMethod 封裝到Spring的異步方法中,這個異步方法的傳回值是Future的執行個體。這個方法一定要寫在Spring管理的類中,注意注解@Async。

@Service
public class AsynchronousService{
  @Async
  public Future springAsynchronousMethod(){
    Integer result = longTimeMethod();
    return new AsyncResult(result);
  }
}           

其他類調用這個方法。這裡注意,一定要其他的類,如果在同類中調用,是不生效的。

@Autowired
private AsynchronousService asynchronousService;

public void useAsynchronousMethod(){
    Future future = asynchronousService.springAsynchronousMethod();
    future.get(1000, TimeUnit.MILLISECONDS);
}           

其實Spring隻不過在原生的Future中進行了一次封裝,我們最終獲得的還是Future執行個體。

ThreadPoolTaskExecutor

當我們需要實作并發、異步等操作時,通常都會使用到ThreadPoolTaskExecutor。

微服務的異步調用

當一個任務被送出到線程池時,首先檢視線程池的核心線程是否都在執行任務,否就選擇一條線程執行任務,是就執行第二步。檢視核心線程池是否已滿,不滿就建立一條線程執行任務,否則執行第三步。檢視任務隊列是否已滿,不滿就将任務存儲在任務隊列中,否則執行第四步。檢視線程池是否已滿,不滿就建立一條線程執行任務,否則就按照政策處理無法執行的任務。

在ThreadPoolExecutor中表現為:

如果目前運作的線程數小于corePoolSize,那麼就建立線程來執行任務(執行時需要擷取全局鎖)。如果運作的線程大于或等于corePoolSize,那麼就把task加入BlockQueue。如果建立的線程數量大于BlockQueue的最大容量,那麼建立新線程來執行該任務。如果建立線程導緻目前運作的線程數超過maximumPoolSize,就根據飽和政策來拒絕該任務。

TaskDecorator

public interface TaskDecorator A callback interface for a decorator to be applied to any Runnable about to be executed. Note that such a decorator is not necessarily being applied to the user-supplied Runnable/Callable but rather to the actual execution callback (which may be a wrapper around the user-supplied task). The primary use case is to set some execution context around the task's invocation, or to provide some monitoring/statistics for task execution.

意思就是說這是一個執行回調方法的裝飾器,主要應用于傳遞上下文,或者提供任務的監控/統計資訊。看上去正好可以應用于我們這種場景。多線程的場景下要多注意。

解決方案

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskDecorator;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;

import javax.annotation.Nonnull;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
    @Bean("ttlExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 設定線程池核心容量
        executor.setCorePoolSize(20);
        // 設定線程池最大容量
        executor.setMaxPoolSize(100);
        // 設定任務隊列長度
        executor.setQueueCapacity(200);
        // 設定線程逾時時間
        executor.setKeepAliveSeconds(60);
        // 設定線程名稱字首
        executor.setThreadNamePrefix("ttl-executor-");
        // 設定任務丢棄後的處理政策
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 設定任務的裝飾
        executor.setTaskDecorator(new ContextCopyingDecorator());
        executor.initialize();
        return executor;
    }

    static class ContextCopyingDecorator implements TaskDecorator {
        @Nonnull
        @Override
        public Runnable decorate(@Nonnull Runnable runnable) {
            RequestAttributes context = RequestContextHolder.currentRequestAttributes();
            SecurityContext securityContext = SecurityContextHolder.getContext();
            return () -> {
                try {
                    RequestContextHolder.setRequestAttributes(context);
                    SecurityContextHolder.setContext(securityContext);
                    runnable.run();
                } finally {
                    SecurityContextHolder.clearContext();
                    RequestContextHolder.resetRequestAttributes();
                }
            };
        }
    }
}