1、為什麼實作請求異步化需要使用servlet3
2、請求異步化後得到的好處是什麼
3、如何使用servlet3異步化
4、一些servlet3異步化壓測資料
首先說下一般http請求處理的流程:
1、容器負責接收并解析請求為httpservletrequest;
2、然後交給servlet進行業務處理;
3、最後通過httpservletresponse寫出響應。
在servlet2.x規範中,所有這些處理都是同步進行的,也就是說必須在一個線程中完成從接收請求、業務處理到響應。
此處已tomcat6舉例子,tomcat6沒有實作servlet3規範,它在處理請求時是通過如下方式實作的:
org.apache.catalina.connector.coyoteadapter#service
// recycle the wrapper request and response
if (!comet) {
request.recycle();
response.recycle();
} else {
// clear converters so that the minimum amount of memory
// is used by this processor
request.clearencoders();
response.clearencoders();
}
在請求結束時會同步進行請求的回收,也就是說請求解析、業務處理和響應必須在一個線程内完成,不能跨越線程界限。
這也就說明了必須使用實作了servlet3規範的容器進行處理,如tomcat 7.x。
2.1、更高的并發能力;
2.2、請求解析和業務處理線程池分離;
2.3、根據業務重要性對業務分級,并分級線程池;
2.4、對業務線程池進行監控、運維、降級等處理。
得益于技術的更新,在jdk7配合tomcat7壓測中獲得了不錯的性能表現。
在引入servlet3之前我們的線程模型是如下樣子的:
整個請求解析、業務處理、生成響應都是由tomcat線程池進行處理,而且都是在一個線程中處理;不能分離線程處理;比如接收到請求後交給其他線程處理,這樣不能靈活定義業務處理模型。
引入servlet3之後,我們的線程模型可以改造為如下樣子:
此處可以看到請求解析使用tomcat單線程;而解析完成後會扔到業務隊列中,由業務線程池進行處理;這種處理方式可以得到如下好處:
1、根據業務重要性對業務進行分級,然後根據分級定義線程池;
2、可以拿到業務線程池,可以進行很多的操作,比如監控、降級等。
在一個系統的發展期間,我們一般把很多服務放到一個系統中進行處理,比如庫存服務、圖書相關服務、延保服務等等;這些服務中我們可以根據其重要性對業務分級别和做一些限制:
1、可以把業務分為核心業務級别和非核心業務級别;
2、為不同級别的業務定義不同的線程池,線程池之間是隔離的;
3、根據業務量定義各級别線程池大小。
此時假設非核心業務因為資料庫連接配接池或者網絡問題抖動造成響應時間慢,不會對我們核心業務産生影響。
因為業務線程池從tomcat中分離出來,可以進行線程池的監控,比如檢視目前處理的請求有多少,是否到了負載瓶頸,到了瓶頸後可以進行業務報警等處理。
上圖是我們一個簡陋的監控圖,可實時檢視到目前處理情況:正在處理的任務有多少,隊列中等待的任務有多少;可以根據這些資料進行監控和預警。
另外我們還可以進行一些簡單的運維:
對業務線程池進行擴容,或者業務出問題時立即清空線程池防止容器崩潰等問題;而不需要等待容器重新開機(容器重新開機需要耗費數十秒甚至數幾十毫秒、而且啟動後會有預熱問題,而造成業務産生抖動)。
如果發現請求處理不過來,負載比較高,最簡單的辦法就是直接清空線程池,将老請求拒絕掉,而沒有雪崩效應。
因為業務隊列和業務線程池都是自己的,可以對這些基礎元件做很多處理,比如定制業務隊列,按照使用者級别對使用者請求排序,進階别使用者得到更高優先級的業務處理。
對于servlet3的使用,可以參考我之前的部落格:
而在我項目裡使用就比較簡單:
1、接收請求
@requestmapping("/book")
public void getbook(httpservletrequest request, @requestparam(value="skuid") final long skuid,
@requestparam(value="cat1") final integer cat1, @requestparam(value="cat2") final integer cat2) throws exception {
onelevelasynccontext.submitfuture(request, () -> bookservice.getbook(skuid, cat1, cat2));
}
通過一級業務線程池接收請求,并送出業務處理到該線程池;
2、業務線程池封裝
public void submitfuture(final httpservletrequest req, final callable<object> task) {
final string uri = req.getrequesturi();
final map<string, string[]> params = req.getparametermap();
final asynccontext asynccontext = req.startasync(); //開啟異步上下文
asynccontext.getrequest().setattribute("uri", uri);
asynccontext.getrequest().setattribute("params", params);
asynccontext.settimeout(asynctimeoutinseconds * 1000);
if(asynclistener != null) {
asynccontext.addlistener(asynclistener);
}
executor.submit(new canceledcallable(asynccontext) { //送出任務給業務線程池
@override
public object call() throws exception {
object o = task.call(); //業務處理調用
if(o == null) {
callback(asynccontext, o, uri, params); //業務完成後,響應處理
}
if(o instanceof completablefuture) {
completablefuture<object> future = (completablefuture<object>)o;
future.thenaccept(resultobject -> callback(asynccontext, resultobject, uri, params))
.exceptionally(e -> {
callback(asynccontext, "", uri, params);
return null;
});
} else if(o instanceof string) {
callback(asynccontext, o, uri, params);
return null;
}
});
private void callback(asynccontext asynccontext, object result, string uri, map<string, string[]> params) {
httpservletresponse resp = (httpservletresponse) asynccontext.getresponse();
try {
if(result instanceof string) {
write(resp, (string)result);
} else {
write(resp, jsonutils.tojson(result));
} catch (throwable e) {
resp.setstatus(httpservletresponse.sc_internal_server_error); //程式内部錯誤
try {
log.error("get info error, uri : {}, params : {}", uri, jsonutils.tojson(params), e);
} catch (exception ex) {
} finally {
asynccontext.complete();
線程池的初始化
@override
public void afterpropertiesset() throws exception {
string[] poolsizes = poolsize.split("-");
//初始線程池大小
int corepoolsize = integer.valueof(poolsizes[0]);
//最大線程池大小
int maximumpoolsize = integer.valueof(poolsizes[1]);
queue = new linkedblockingdeque<runnable>(queuecapacity);
executor = new threadpoolexecutor(
corepoolsize, maximumpoolsize,
keepalivetimeinseconds, timeunit.seconds,
queue);
executor.allowcorethreadtimeout(true);
executor.setrejectedexecutionhandler(new rejectedexecutionhandler() {
public void rejectedexecution(runnable r, threadpoolexecutor executor) {
if(r instanceof canceledcallable) {
canceledcallable cc = ((canceledcallable) r);
asynccontext asynccontext = cc.asynccontext;
if(asynccontext != null) {
try {
string uri = (string) asynccontext.getrequest().getattribute("uri");
map params = (map) asynccontext.getrequest().getattribute("params");
log.error("async request rejected, uri : {}, params : {}", uri, jsonutils.tojson(params));
} catch (exception e) {}
httpservletresponse resp = (httpservletresponse) asynccontext.getresponse();
resp.setstatus(httpservletresponse.sc_internal_server_error);
} finally {
asynccontext.complete();
}
}
if(asynclistener == null) {
asynclistener = new asynclistener() {
@override
public void oncomplete(asyncevent event) throws ioexception {
public void ontimeout(asyncevent event) throws ioexception {
asynccontext asynccontext = event.getasynccontext();
try {
string uri = (string) asynccontext.getrequest().getattribute("uri");
map params = (map) asynccontext.getrequest().getattribute("params");
log.error("async request timeout, uri : {}, params : {}", uri, jsonutils.tojson(params));
} catch (exception e) {}
httpservletresponse resp = (httpservletresponse) asynccontext.getresponse();
resp.setstatus(httpservletresponse.sc_internal_server_error);
} finally {
asynccontext.complete();
public void onerror(asyncevent event) throws ioexception {
log.error("async request error, uri : {}, params : {}", uri, jsonutils.tojson(params));
public void onstartasync(asyncevent event) throws ioexception {
};
3、業務處理
執行bookservice.getbook(skuid, cat1, cat2)進行業務處理。
4、傳回響應
在之前封裝的異步線程池上下文中直接傳回。
5、tomcat server.xml的配置
<connector port="1601" asynctimeout="10000" acceptcount="10240" maxconnections="10240" acceptorthreadcount="1" minsparethreads="1" maxthreads="1" redirectport="8443" processorcache="1024" uriencoding="utf-8" protocol="org.apache.coyote.http11.http11nioprotocol" enablelookups="false"/>
我們更新到了jdk1.8.0_51 +tomcat
8.0.26,在使用http11nio2protocol時遇到一些問題,暫時還是使用的http11nio1protocol。此處可以看到tomcat線程池我們配置了maxthreads=1,即一個線程進行請求解析。
壓測機器基本環境:32核cpu、32g記憶體;jdk1.7.0_71 + tomcat 7.0.57,服務響應時間在20ms+,使用最簡單的單個url壓測吞吐量:
1、使用同步方式壓測
siege-3.0.7]# ./src/siege -c100 -t60s -b http://***.item.jd.com/981821
transactions: 279187 hits
availability: 100.00 %
elapsed time: 59.33 secs
data transferred: 1669.41 mb
response time: 0.02 secs
transaction rate: 4705.66 trans/sec
throughput: 28.14 mb/sec
concurrency: 99.91
successful transactions: 279187
failed transactions: 0
longest transaction: 1.04
shortest transaction: 0.00
2.1、 使用servlet3異步化壓測 100并發、60秒:
siege-3.0.7]# ./src/siege -c100 -t60s -b http://***.item.jd.com/981821 .
transactions: 337998 hits
elapsed time: 59.09 secs
data transferred: 2021.07 mb
response time: 0.03 secs
transaction rate: 5720.05 trans/sec
throughput: 34.20 mb/sec
concurrency: 149.79
successful transactions: 337998
longest transaction: 1.07
2.2、使用servlet3異步化壓測 600并發、60秒:
siege-3.0.7]# ./src/siege -c600 -t60s -b http://***.item.jd.com/981821
transactions: 370985 hits
elapsed time: 59.16 secs
data transferred: 2218.32 mb
response time: 0.10 secs
transaction rate: 6270.88 trans/sec
throughput: 37.50 mb/sec
concurrency: 598.31
successful transactions: 370985
longest transaction: 1.32
可以看出異步化之後吞吐量提升了,但是響應時間長了,也就是異步化并不會提升響應時間,但是會增加吞吐量和增加我們需要的靈活性。
通過異步化我們不會獲得更快的響應時間,但是我們獲得了整體吞吐量和我們需要的靈活性:請求解析和業務處理線程池分離;根據業務重要性對業務分級,并分級線程池;對業務線程池進行監控、運維、降級等處理。
原文連結:[http://wely.iteye.com/blog/2346283]