微服務的異步調用
三豐 soft張三豐

異步調用
一個可以無需等待被調用函數的傳回值就讓操作繼續進行的方法。
異步調用就是你 喊 你朋友吃飯 ,你朋友說知道了 ,待會忙完去找你 ,你就去做别的了。同步調用就是你 喊 你朋友吃飯 ,你朋友在忙 ,你就一直在那等,等你朋友忙完了 ,你們一起去。
jdk1.8之前的Future
jdk并發包裡的Future代表了未來的某個結果,當我們向線程池中送出任務的時候會傳回該對象,可以通過future獲得執行的結果,但是jdk1.8之前的Future有點雞肋,并不能實作真正的異步,需要阻塞的擷取結果,或者不斷的輪詢。
通常我們希望當線程執行完一些耗時的任務後,能夠自動的通知我們結果,很遺憾這在原生jdk1.8之前是不支援的,但是我們可以通過第三方的庫實作真正的異步回調。
public class JavaFuture {
public static void main(String[] args) throws Throwable, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<String> f = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("task started!");
longTimeMethod();
System.out.println("task finished!");
return "hello";
}
});
//此處get()方法阻塞main線程
System.out.println(f.get());
System.out.println("main thread is blocked");
}
}
如果想獲得耗時操作的結果,可以通過get()方法擷取,但是該方法會阻塞目前線程,我們可以在做完剩下的某些工作的時候調用get()方法試圖去擷取結果。
也可以調用非阻塞的方法isDone來确定操作是否完成,isDone這種方式有點兒類似下面的過程:
jdk1.8開始的Future
直到jdk1.8才算真正支援了異步操作,jdk1.8中提供了lambda表達式,使得java向函數式語言又靠近了一步。借助jdk原生的CompletableFuture可以實作異步的操作,同時結合lambada表達式大大簡化了代碼量。代碼例子如下:
package netty_promise;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
public class JavaPromise {
public static void main(String[] args) throws Throwable, ExecutionException {
// 兩個線程的線程池
ExecutorService executor = Executors.newFixedThreadPool(2);
//jdk1.8之前的實作方式
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
System.out.println("task started!");
try {
//模拟耗時操作
longTimeMethod();
} catch (InterruptedException e) {
e.printStackTrace();
}
return "task finished!";
}
}, executor);
//采用lambada的實作方式
future.thenAccept(e -> System.out.println(e + " ok"));
System.out.println("main thread is running");
}
}
實作方式類似下圖:
Spring的異步方法
先把longTimeMethod 封裝到Spring的異步方法中,這個異步方法的傳回值是Future的執行個體。這個方法一定要寫在Spring管理的類中,注意注解@Async。
@Service
public class AsynchronousService{
@Async
public Future springAsynchronousMethod(){
Integer result = longTimeMethod();
return new AsyncResult(result);
}
}
其他類調用這個方法。這裡注意,一定要其他的類,如果在同類中調用,是不生效的。
@Autowired
private AsynchronousService asynchronousService;
public void useAsynchronousMethod(){
Future future = asynchronousService.springAsynchronousMethod();
future.get(1000, TimeUnit.MILLISECONDS);
}
其實Spring隻不過在原生的Future中進行了一次封裝,我們最終獲得的還是Future執行個體。
ThreadPoolTaskExecutor
當我們需要實作并發、異步等操作時,通常都會使用到ThreadPoolTaskExecutor。
當一個任務被送出到線程池時,首先檢視線程池的核心線程是否都在執行任務,否就選擇一條線程執行任務,是就執行第二步。檢視核心線程池是否已滿,不滿就建立一條線程執行任務,否則執行第三步。檢視任務隊列是否已滿,不滿就将任務存儲在任務隊列中,否則執行第四步。檢視線程池是否已滿,不滿就建立一條線程執行任務,否則就按照政策處理無法執行的任務。
在ThreadPoolExecutor中表現為:
如果目前運作的線程數小于corePoolSize,那麼就建立線程來執行任務(執行時需要擷取全局鎖)。如果運作的線程大于或等于corePoolSize,那麼就把task加入BlockQueue。如果建立的線程數量大于BlockQueue的最大容量,那麼建立新線程來執行該任務。如果建立線程導緻目前運作的線程數超過maximumPoolSize,就根據飽和政策來拒絕該任務。
TaskDecorator
public interface TaskDecorator A callback interface for a decorator to be applied to any Runnable about to be executed. Note that such a decorator is not necessarily being applied to the user-supplied Runnable/Callable but rather to the actual execution callback (which may be a wrapper around the user-supplied task). The primary use case is to set some execution context around the task's invocation, or to provide some monitoring/statistics for task execution.
意思就是說這是一個執行回調方法的裝飾器,主要應用于傳遞上下文,或者提供任務的監控/統計資訊。看上去正好可以應用于我們這種場景。多線程的場景下要多注意。
解決方案
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskDecorator;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import javax.annotation.Nonnull;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Bean("ttlExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 設定線程池核心容量
executor.setCorePoolSize(20);
// 設定線程池最大容量
executor.setMaxPoolSize(100);
// 設定任務隊列長度
executor.setQueueCapacity(200);
// 設定線程逾時時間
executor.setKeepAliveSeconds(60);
// 設定線程名稱字首
executor.setThreadNamePrefix("ttl-executor-");
// 設定任務丢棄後的處理政策
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 設定任務的裝飾
executor.setTaskDecorator(new ContextCopyingDecorator());
executor.initialize();
return executor;
}
static class ContextCopyingDecorator implements TaskDecorator {
@Nonnull
@Override
public Runnable decorate(@Nonnull Runnable runnable) {
RequestAttributes context = RequestContextHolder.currentRequestAttributes();
SecurityContext securityContext = SecurityContextHolder.getContext();
return () -> {
try {
RequestContextHolder.setRequestAttributes(context);
SecurityContextHolder.setContext(securityContext);
runnable.run();
} finally {
SecurityContextHolder.clearContext();
RequestContextHolder.resetRequestAttributes();
}
};
}
}
}