天天看點

spring boot--使用異步請求,提高系統的吞吐量

前言:

在我們的實際生産中,常常會遇到下面的這種情況,某個請求非常耗時(大約5s傳回),當大量的通路該請求的時候,再請求其他服務時,會造成沒有連接配接使用的情況,造成這種現象的主要原因是,我們的容器(tomcat)中線程的數量是一定的,例如500個,當這500個線程都用來請求服務的時候,再有請求進來,就沒有多餘的連接配接可用了,隻能拒絕連接配接。要是我們在請求耗時服務的時候,能夠異步請求(請求到controller中時,則容器線程直接傳回,然後使用系統内部的線程來執行耗時的服務,等到服務有傳回的時候,再将請求傳回給用戶端),那麼系統的吞吐量就會得到很大程度的提升了。當然,大家可以直接使用Hystrix的資源隔離來實作,今天我們的重點是spring mvc是怎麼來實作這種異步請求的。

一、使用Callable來實作

controller如下:

@RestController
public class HelloController {

	private static final Logger logger = LoggerFactory.getLogger(HelloController.class);
	
	@Autowired
	private HelloService hello;

	@GetMapping("/helloworld")
	public String helloWorldController() {
		return hello.sayHello();
	}

	/**
	 * 異步調用restful
	 * 當controller傳回值是Callable的時候,springmvc就會啟動一個線程将Callable交給TaskExecutor去處理
	 * 然後DispatcherServlet還有所有的spring攔截器都退出主線程,然後把response保持打開的狀态
	 * 當Callable執行結束之後,springmvc就會重新啟動配置設定一個request請求,然後DispatcherServlet就重新
	 * 調用和處理Callable異步執行的傳回結果, 然後傳回視圖
	 * 
	 * @return
	 */
	@GetMapping("/hello")
	public Callable<String> helloController() {
		logger.info(Thread.currentThread().getName() + " 進入helloController方法");
		Callable<String> callable = new Callable<String>() {

			@Override
			public String call() throws Exception {
				logger.info(Thread.currentThread().getName() + " 進入call方法");
				String say = hello.sayHello();
				logger.info(Thread.currentThread().getName() + " 從helloService方法傳回");
				return say;
			}
		};
		logger.info(Thread.currentThread().getName() + " 從helloController方法傳回");
		return callable;
	}
}
           

我們首先來看下上面這兩個請求的差別

下面這個是沒有使用異步請求的

2017-12-07 18:05:42.351  INFO 3020 --- [nio-8060-exec-5] c.travelsky.controller.HelloController   : http-nio-8060-exec-5 進入helloWorldController方法
2017-12-07 18:05:42.351  INFO 3020 --- [nio-8060-exec-5] com.travelsky.service.HelloService       : http-nio-8060-exec-5 進入sayHello方法!
2017-12-07 18:05:44.351  INFO 3020 --- [nio-8060-exec-5] c.travelsky.controller.HelloController   : http-nio-8060-exec-5 從helloWorldController方法傳回
           

我們可以看到,請求從頭到尾都隻有一個線程,并且整個請求耗費了2s鐘的時間。

下面,我們再來看下使用Callable異步請求的結果:

2017-12-07 18:11:55.671  INFO 6196 --- [nio-8060-exec-1] c.travelsky.controller.HelloController   : http-nio-8060-exec-1 進入helloController方法
2017-12-07 18:11:55.672  INFO 6196 --- [nio-8060-exec-1] c.travelsky.controller.HelloController   : http-nio-8060-exec-1 從helloController方法傳回
2017-12-07 18:11:55.676  INFO 6196 --- [nio-8060-exec-1] c.t.i.MyAsyncHandlerInterceptor          : http-nio-8060-exec-1 進入afterConcurrentHandlingStarted方法
2017-12-07 18:11:55.676  INFO 6196 --- [      MvcAsync1] c.travelsky.controller.HelloController   : MvcAsync1 進入call方法
2017-12-07 18:11:55.676  INFO 6196 --- [      MvcAsync1] com.travelsky.service.HelloService       : MvcAsync1 進入sayHello方法!
2017-12-07 18:11:57.677  INFO 6196 --- [      MvcAsync1] c.travelsky.controller.HelloController   : MvcAsync1 從helloService方法傳回
2017-12-07 18:11:57.721  INFO 6196 --- [nio-8060-exec-2] c.t.i.MyAsyncHandlerInterceptor          : http-nio-8060-exec-2服務調用完成,傳回結果給用戶端
           

從上面的結果中,我們可以看出,容器的線程http-nio-8060-exec-1這個線程進入controller之後,就立即傳回了,具體的服務調用是通過MvcAsync2這個線程來做的,當服務執行完要傳回後,容器會再啟一個新的線程http-nio-8060-exec-2來将結果傳回給用戶端或浏覽器,整個過程response都是打開的,當有傳回的時候,再從server端推到response中去。

1、異步調用的另一種方式

上面的示例是通過callable來實作的異步調用,其實還可以通過WebAsyncTask,也能實作異步調用,下面看示例:

@RestController
public class HelloController {

	private static final Logger logger = LoggerFactory.getLogger(HelloController.class);
	
	@Autowired
	private HelloService hello;

		/**
	 * 帶逾時時間的異步請求 通過WebAsyncTask自定義用戶端逾時間
	 * 
	 * @return
	 */
	@GetMapping("/world")
	public WebAsyncTask<String> worldController() {
		logger.info(Thread.currentThread().getName() + " 進入helloController方法");

		// 3s鐘沒傳回,則認為逾時
		WebAsyncTask<String> webAsyncTask = new WebAsyncTask<>(3000, new Callable<String>() {

			@Override
			public String call() throws Exception {
				logger.info(Thread.currentThread().getName() + " 進入call方法");
				String say = hello.sayHello();
				logger.info(Thread.currentThread().getName() + " 從helloService方法傳回");
				return say;
			}
		});
		logger.info(Thread.currentThread().getName() + " 從helloController方法傳回");

		webAsyncTask.onCompletion(new Runnable() {

			@Override
			public void run() {
				logger.info(Thread.currentThread().getName() + " 執行完畢");
			}
		});

		webAsyncTask.onTimeout(new Callable<String>() {

			@Override
			public String call() throws Exception {
				logger.info(Thread.currentThread().getName() + " onTimeout");
				// 逾時的時候,直接抛異常,讓外層統一處理逾時異常
				throw new TimeoutException("調用逾時");
			}
		});
		return webAsyncTask;
	}

	/**
	 * 異步調用,異常處理,詳細的處理流程見MyExceptionHandler類
	 * 
	 * @return
	 */
	@GetMapping("/exception")
	public WebAsyncTask<String> exceptionController() {
		logger.info(Thread.currentThread().getName() + " 進入helloController方法");
		Callable<String> callable = new Callable<String>() {

			@Override
			public String call() throws Exception {
				logger.info(Thread.currentThread().getName() + " 進入call方法");
				throw new TimeoutException("調用逾時!");
			}
		};
		logger.info(Thread.currentThread().getName() + " 從helloController方法傳回");
		return new WebAsyncTask<>(20000, callable);
	}

}
           

運作結果如下:

2017-12-07 19:10:26.582  INFO 6196 --- [nio-8060-exec-4] c.travelsky.controller.HelloController   : http-nio-8060-exec-4 進入helloController方法
2017-12-07 19:10:26.585  INFO 6196 --- [nio-8060-exec-4] c.travelsky.controller.HelloController   : http-nio-8060-exec-4 從helloController方法傳回
2017-12-07 19:10:26.589  INFO 6196 --- [nio-8060-exec-4] c.t.i.MyAsyncHandlerInterceptor          : http-nio-8060-exec-4 進入afterConcurrentHandlingStarted方法
2017-12-07 19:10:26.591  INFO 6196 --- [      MvcAsync2] c.travelsky.controller.HelloController   : MvcAsync2 進入call方法
2017-12-07 19:10:26.591  INFO 6196 --- [      MvcAsync2] com.travelsky.service.HelloService       : MvcAsync2 進入sayHello方法!
2017-12-07 19:10:28.591  INFO 6196 --- [      MvcAsync2] c.travelsky.controller.HelloController   : MvcAsync2 從helloService方法傳回
2017-12-07 19:10:28.600  INFO 6196 --- [nio-8060-exec-5] c.t.i.MyAsyncHandlerInterceptor          : http-nio-8060-exec-5服務調用完成,傳回結果給用戶端
2017-12-07 19:10:28.601  INFO 6196 --- [nio-8060-exec-5] c.travelsky.controller.HelloController   : http-nio-8060-exec-5 執行完畢
           

這種方式和上面的callable方式最大的差別就是,WebAsyncTask支援逾時,并且還提供了兩個回調函數,分别是onCompletion和onTimeout,顧名思義,這兩個回調函數分别在執行完成和逾時的時候回調。

3、Deferred方式實作異步調用

在我們是生産中,往往會遇到這樣的情景,controller中調用的方法很多都是和第三方有關的,例如JMS,定時任務,隊列等,拿JMS來說,比如controller裡面的服務需要從JMS中拿到傳回值,才能給用戶端傳回,而從JMS拿值這個過程也是異步的,這個時候,我們就可以通過Deferred來實作整個的異步調用。

首先,我們來模拟一個長時間調用的任務,代碼如下:

@Component
public class LongTimeTask {
	private final Logger logger = LoggerFactory.getLogger(this.getClass());
	@Async
	public void execute(DeferredResult<String> deferred){
		logger.info(Thread.currentThread().getName() + "進入 taskService 的 execute方法");
		try {
			// 模拟長時間任務調用,睡眠2s
			TimeUnit.SECONDS.sleep(2);
			// 2s後給Deferred發送成功消息,告訴Deferred,我這邊已經處理完了,可以傳回給用戶端了
			deferred.setResult("world");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}
           

接着,我們就來實作異步調用,controller如下:

@RestController
public class AsyncDeferredController {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final LongTimeTask taskService;
    
    @Autowired
    public AsyncDeferredController(LongTimeTask taskService) {
        this.taskService = taskService;
    }
    
    @GetMapping("/deferred")
    public DeferredResult<String> executeSlowTask() {
        logger.info(Thread.currentThread().getName() + "進入executeSlowTask方法");
        DeferredResult<String> deferredResult = new DeferredResult<>();
        // 調用長時間執行任務
        taskService.execute(deferredResult);
        // 當長時間任務中使用deferred.setResult("world");這個方法時,會從長時間任務中傳回,繼續controller裡面的流程
        logger.info(Thread.currentThread().getName() + "從executeSlowTask方法傳回");
        // 逾時的回調方法
        deferredResult.onTimeout(new Runnable(){
		
			@Override
			public void run() {
				logger.info(Thread.currentThread().getName() + " onTimeout");
				// 傳回逾時資訊
				deferredResult.setErrorResult("time out!");
			}
		});
        
        // 處理完成的回調方法,無論是逾時還是處理成功,都會進入這個回調方法
        deferredResult.onCompletion(new Runnable(){
		
			@Override
			public void run() {
				logger.info(Thread.currentThread().getName() + " onCompletion");
			}
		});
        
        return deferredResult;
    }
}
           

執行結果如下:

2017-12-07 19:25:40.192  INFO 6196 --- [nio-8060-exec-7] c.t.controller.AsyncDeferredController   : http-nio-8060-exec-7進入executeSlowTask方法
2017-12-07 19:25:40.193  INFO 6196 --- [nio-8060-exec-7] .s.a.AnnotationAsyncExecutionInterceptor : No TaskExecutor bean found for async processing
2017-12-07 19:25:40.194  INFO 6196 --- [nio-8060-exec-7] c.t.controller.AsyncDeferredController   : http-nio-8060-exec-7從executeSlowTask方法傳回
2017-12-07 19:25:40.198  INFO 6196 --- [nio-8060-exec-7] c.t.i.MyAsyncHandlerInterceptor          : http-nio-8060-exec-7 進入afterConcurrentHandlingStarted方法
2017-12-07 19:25:40.202  INFO 6196 --- [cTaskExecutor-1] com.travelsky.controller.LongTimeTask    : SimpleAsyncTaskExecutor-1進入 taskService 的 execute方法
2017-12-07 19:25:42.212  INFO 6196 --- [nio-8060-exec-8] c.t.i.MyAsyncHandlerInterceptor          : http-nio-8060-exec-8服務調用完成,傳回結果給用戶端
2017-12-07 19:25:42.213  INFO 6196 --- [nio-8060-exec-8] c.t.controller.AsyncDeferredController   : http-nio-8060-exec-8 onCompletion
           

從上面的執行結果不難看出,容器線程會立刻傳回,應用程式使用線程池裡面的cTaskExecutor-1線程來完成長時間任務的調用,當調用完成後,容器又啟了一個連接配接線程,來傳回最終的執行結果。

這種異步調用,在容器線程資源非常寶貴的時候,能夠大大的提高整個系統的吞吐量。

ps:異步調用可以使用AsyncHandlerInterceptor進行攔截,使用示例如下:

@Component
public class MyAsyncHandlerInterceptor implements AsyncHandlerInterceptor {
	
	private static final Logger logger = LoggerFactory.getLogger(MyAsyncHandlerInterceptor.class);

	@Override
	public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
			throws Exception {
		return true;
	}

	@Override
	public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
			ModelAndView modelAndView) throws Exception {
//		HandlerMethod handlerMethod = (HandlerMethod) handler;
		logger.info(Thread.currentThread().getName()+ "服務調用完成,傳回結果給用戶端");
	}

	@Override
	public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex)
			throws Exception {
		if(null != ex){
			System.out.println("發生異常:"+ex.getMessage());
		}
	}

	@Override
	public void afterConcurrentHandlingStarted(HttpServletRequest request, HttpServletResponse response, Object handler)
			throws Exception {
		
		// 攔截之後,重新寫回資料,将原來的hello world換成如下字元串
		String resp = "my name is chhliu!";
		response.setContentLength(resp.length());
		response.getOutputStream().write(resp.getBytes());
		
		logger.info(Thread.currentThread().getName() + " 進入afterConcurrentHandlingStarted方法");
	}

}
           

有興趣的可以了解下,本篇部落格的主題是異步調用,其他的相關知識點,會在下一篇部落格中進行講解。

繼續閱讀