源碼分析基于:OkHttp3.14.7進行。
對于Android開發者而言,網絡請求不可避免。我們使用的最多的就是Square的OkHttp3架構。關于這個架構使用的時間很長,正好前一段時間有點時間,就利用業餘時間仔細閱讀了該架構的源碼,現在就把閱讀過程中自己的感悟寫下來分享給大家。
一、使用場景:
對于OkHttp3有兩種網絡請求的方式,一種是同步請求的方式;一種就是異步請求的方式。
1.同步請求:
示例代碼如下:
private void synGet() {
final OkHttpClient client = new OkHttpClient();
final Request request = new Request.Builder()
.url("https://publicobject.com/helloworld.txt")
.build();
mHttpPool.execute(new Runnable() {
@Override
public void run() {
try {
final Response response = client.newCall(request).execute();
if (!response.isSuccessful())
throw new IOException("Unexpected code " + response);
Headers responseHeaders = response.headers();
for (int i = 0; i < responseHeaders.size(); i++) {
System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
}
System.out.println(response.body().string());
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
對于同步請求是将請求直接放進執行隊列進行執行。
2.異步請求:
private void asynGet() {
final OkHttpClient client = new OkHttpClient();
final Request request = new Request.Builder()
.url("https://publicobject.com/helloworld.txt")
.build();
final Call call = client.newCall(request);
//
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
//失敗
e.printStackTrace();
}
@Override
public void onResponse(Call call, Response response) throws IOException {
final ResponseBody body = response.body();
if (!response.isSuccessful()) {
throw new IOException("Unexcepted code " + response);
}
//請求頭
final Headers headers = response.headers();
final int count = headers.size();
for (int i = 0; i < count; i++) {
Log.d(TAG, headers.name(i) + " : " + headers.value(i));
}
Log.d(TAG, "result : " + body.string());
}
});
}
異步請求是将網絡請求添加進入隊列等待執行。
二、請求邏輯分析:
我們既然使用了OkHttp3中的這兩種網絡請求。那麼我們就需要配知道它是怎麼做到的,怎麼講使用者的請求 參數進行的封裝,怎麼将使用者的請求進行的傳遞,以及怎麼講網絡請求傳遞給伺服器,以及需不需要握手啊等等吧,在這裡就不進行逐一列舉了。
1.同步請求源碼邏輯分析:
我們先從同步請求說起吧。具體是的使用方法見第一部分代碼示例,在這裡就不再說明。我們直接看線上程中執行的執行網絡請求擷取伺服器相應的代碼:“Response response = client.newCall(request).execute()”,在這段代碼中首先是講Request作為參數擷取的RealCall對象,Real是Call接口的實作類,一方面持有已準備就緒的特定網絡請求(Request)資訊,另一方面持有Transmitter對象驅動網絡請求并将本次網絡請求(call)添加到請求分發器(Dispatcher)的集合中。同時,RealCall中添加了網絡請求相關的攔截器。然後調用了RealCall中的execute()方法,擷取的伺服器相應資料。
我們接下來看一下execute()方法的源碼:
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.timeoutEnter();
transmitter.callStart();
try {
client.dispatcher().executed(this);
return getResponseWithInterceptorChain();
} finally {
client.dispatcher().finished(this);
}
}
該方法主要邏輯如下:
1>保證每一個call隻能執行一次,通過标志變量标記執行的狀态,當已經執行過就會抛出異常;
2>網絡請求的執行:client.dispatcher().executed(this)是真正的執行該網絡請求,通過OkHttpClient擷取Dispatcher對象,關于Dispatcher我們不在這裡解釋,隻需要知道它是網絡請求執行的政策就可以了。
3>添加攔截器和擷取響應:最終我們通過getResponseWithInterceptorChain()方法為該call添加各種攔截器和擷取請求的響應;
4>最終通過Dispatcher的對象調用finish方法,進行網絡請求完成的相關操作。
關于 添加攔截器以及擷取相應源碼如下:
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
//失敗重試和重定向攔截器;
interceptors.add(new RetryAndFollowUpInterceptor(client));
//負責将使用者請求轉換為伺服器請求,并傳回使用者友好的資料響應;
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//緩存相關攔截器;
interceptors.add(new CacheInterceptor(client.internalCache()));
//和伺服器建立連接配接的攔截器;
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
//向伺服器發送網絡請求的攔截器;
interceptors.add(new CallServerInterceptor(forWebSocket));
//攔截器調用鍊;
Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
boolean calledNoMoreExchanges = false;
try {
//執行網絡請求
Response response = chain.proceed(originalRequest);
if (transmitter.isCanceled()) {
closeQuietly(response);
throw new IOException("Canceled");
}
return response;
} catch (IOException e) {
calledNoMoreExchanges = true;
throw transmitter.noMoreExchanges(e);
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null);
}
}
}
我們進行的每一個網絡請求都是通過該方法進行相應攔截器的添加邏輯。關于攔截器相關的代碼解析我們會在後面的部門進行。
2.異步網絡請求源碼邏輯分析:
關于使用OkHttp進行異步請求網絡資料的代碼這裡就不再多說了。它也是首先獲得RealCall對象,然後通過RealCall的enqueue()方法進行的。下面我們就來分析一下enqueue()方法的源碼:
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.callStart();
//将異步請求添加進隊列等待執行;
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
該方法是将AsyncCall(異步請求)添加進隊列等待執行;下面通過Dispatcher的enqueue()方法的将AsyncCall添加進隊列(readyAsyncCalls準備就緒的隊列),并通過promoteAndExecute()方法将準備就緒的Call從readyAsyncCalls中添加進runningAsyncCalls(正在執行的隊列)中,并進行執行。我們看一下promoteAndExecute()方法的源碼:
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
asyncCall.callsPerHost().incrementAndGet();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
從上面我們看到最終是調用AsyncCall的executeOn()方法執行的,我們通過AsyncCall的源碼知道,其本質就是一個線程,其executeOn()方法的源碼如下:
/**
* Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up
* if the executor has been shut down by reporting the call as failed.
*/
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
//調用該線程的run()方法執行。
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
transmitter.noMoreExchanges(ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
//執行完成
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
在該方法中,通過executorService運作了該線程,我們通過代碼知道AsyncCall繼承于NamedRunnable,在其父類中的run()方法中調用了execute()方法,我們通過AsyncCall的該方法中的代碼中知道, 最終也調用了該方法getResponseWithInterceptorChain()添加攔截器和擷取網絡請求的相應(Response).關于getResponseWithInterceptorChain()方法中的邏輯分析,請看同步請求部分。
三、攔截器
我們通過第二部分了解到使用OkHttp進行網絡請求需要添加一些列的攔截器,關于這些攔截器的作用就是這部分的主要内容,也是OkHttp這個架構的重點。我們回歸到第二部分中的getResponseWithInterceptorChain()方法,在該方法中建立了一個RealInterceptorChain對象chain,而攔截器數組就是作為其參數,看來該類就是主要處理攔截器的邏輯的。下面通過chain對象執行chain.proceed(originalRequest)方法擷取了Response對象。下面我們就看一下proceed()方法,看一看OkHttp是怎麼處理這些攔截器的。源碼如下:
@Override public Response proceed(Request request) throws IOException {
return proceed(request, transmitter, exchange);
}
public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
//如果我們已經有一個資料流,确定即将到來的Request會使用它;
if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
//保證該攔截器隻調用過一次;
if (this.exchange != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// Call the next interceptor in the chain.
//擷取并調用下一個攔截器;
RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
Interceptor interceptor = interceptors.get(index);
//執行對應攔截器的intercept()方法
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
//確定下一個攔截器沒有調用過;
if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}
return response;
}
該方法確定攔截器鍊中的攔截器能準确無誤的逐一執行,也就是責任鍊模式的展現;
1.RetryAndFollowUpInterceptor:失敗和重定向攔截器;
該攔截器從連接配接失敗中恢複,并根據需要進行重定向。如果網絡請求取消了有可能會抛出IOException.
重定向:通過各種方法将各種網絡請求重新定個方向轉到其它位置(如:網頁重定向、域名的重定向、路由選擇的變化也是對資料封包經由路徑的一種重定向)。
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Transmitter transmitter = realChain.transmitter();
int followUpCount = 0;
Response priorResponse = null;
while (true) {
//攜帶請求資訊建立資料流,存在則複用;
transmitter.prepareToConnect(request);
if (transmitter.isCanceled()) {
throw new IOException("Canceled");
}
Response response;
boolean success = false;
try {
//執行網絡請求,并執行下一個攔截器
response = realChain.proceed(request, transmitter, null);
success = true;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
//報告并不再嘗試去恢複故障後的與伺服器之間的通信;
if (!recover(e.getLastConnectException(), transmitter, false, request)) {
throw e.getFirstConnectException();
}
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
//報告并可能嘗試去恢複故障後的與伺服器之間的通信;
if (!recover(e, transmitter, requestSendStarted, request)) throw e;
continue;
} finally {
// The network call threw an exception. Release any resources.
if (!success) {
transmitter.exchangeDoneDueToException();
}
}
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
Exchange exchange = Internal.instance.exchange(response);
Route route = exchange != null ? exchange.connection().route() : null;
//失敗重試以及重定向
Request followUp = followUpRequest(response, route);
if (followUp == null) {
if (exchange != null && exchange.isDuplex()) {
transmitter.timeoutEarlyExit();
}
return response;
}
RequestBody followUpBody = followUp.body();
if (followUpBody != null && followUpBody.isOneShot()) {
return response;
}
closeQuietly(response.body());
if (transmitter.hasExchange()) {
exchange.detachWithViolence();
}
//進行次數限制
if (++followUpCount > MAX_FOLLOW_UPS) {
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
request = followUp;
priorResponse = response;
}
}
關于這裡怎麼進行重試以及重定向的沒有了解其思路。
2.BridgeInterceptor:
該攔截器從應用代碼到網絡代碼的橋梁。首先它根據使用者請求轉化為網絡請求。然後它會執行網絡請求。最終它會将網絡響應轉化為使用者響應。
@Override public Response intercept(Chain chain) throws IOException {
//使用者請求
Request userRequest = chain.request();
//建立新的請求
Request.Builder requestBuilder = userRequest.newBuilder();
//請求體
RequestBody body = userRequest.body();
if (body != null) {
//請求體添加Content-Type
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
//請求體添加Content-Length
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
//添加Host
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
//添加Header:Connection
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
//添加Header:Accept-Encoding,如果添加了"Accept-Encoding: gzip"header,則需要解壓縮傳輸流;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
//添加Header:Cookie
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
//添加Header:User-Agent
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
Response networkResponse = chain.proceed(requestBuilder.build());
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
//生成新的Response
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
該方法就是起到了将使用者請求轉化為網絡請求的作用,并将網絡的響應轉化為使用者響應。
3.CacheInterceptor:緩存攔截器
該攔截器将請求從緩存中取出,并将響應寫入緩存;
@Override public Response intercept(Chain chain) throws IOException {
//如果存在緩存,則擷取存儲的Response;
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
//緩存政策
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
//跟蹤滿足目前政策的響應(Response)
if (cache != null) {
cache.trackResponse(strategy);
}
//緩存不适用則關閉
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
// If we're forbidden from using the network and the cache is insufficient, fail.
//如果禁用網絡并且緩存不足則失敗~
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// If we don't need the network, we're done.
//如果不使用網絡,則傳回body為nul的Response;
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
//更新緩存
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
return response;
}
4.ConnectInterceptor:
該攔截器負責建立一個指向指定服務的連接配接并執行下一個攔截器;
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
Transmitter transmitter = realChain.transmitter();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//建立新的Exchange對象,并建立與伺服器的連接配接。
Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
return realChain.proceed(request, transmitter, exchange);
}
方法中Exchange是用來傳遞單一的Request和Response對,具體的連接配接管理和事件相關是在ExchangeCodec層中。具體的連接配接建立邏輯還需要分析transmitter.newExchange()方法,這部分将在後面的分析中介紹。
5.CallServerInterceptor:
這是最後一個攔截器,它負責向伺服器發送一個指定的網絡請求。
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Exchange exchange = realChain.exchange();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
exchange.writeRequestHeaders(request);
boolean responseHeadersStarted = false;
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
//如果請求(Request)中存在"Expect: 100-continue"這樣的請求頭,就需要在發送請求正文(request body)
//之前等待"HTTP/1.1 100 Continue"的響應。如果我們得不到該相應,則傳回我們得到的結果(例如4**的響應)
//而無需發送請求主體。
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
exchange.flushRequest();
responseHeadersStarted = true;
exchange.responseHeadersStart();
responseBuilder = exchange.readResponseHeaders(true);
}
if (responseBuilder == null) {
if (request.body().isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest();
BufferedSink bufferedRequestBody = Okio.buffer(
exchange.createRequestBody(request, true));
request.body().writeTo(bufferedRequestBody);
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
BufferedSink bufferedRequestBody = Okio.buffer(
exchange.createRequestBody(request, false));
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
}
} else {
exchange.noRequestBody();
if (!exchange.connection().isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection();
}
}
} else {
exchange.noRequestBody();
}
if (request.body() == null || !request.body().isDuplex()) {
exchange.finishRequest();
}
if (!responseHeadersStarted) {
exchange.responseHeadersStart();
}
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(false);
}
Response response = responseBuilder
.request(request)
.handshake(exchange.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
//收到100-continue,重新嘗試去讀取實際的響應
response = exchange.readResponseHeaders(false)
.request(request)
.handshake(exchange.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
code = response.code();
}
exchange.responseHeadersEnd(response);
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
//連接配接更新了,我們需要確定攔截器能得到非空的響應體;
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(exchange.openResponseBody(response))
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
exchange.noNewExchangesOnConnection();
}
//伺服器連接配接成功,但是沒有内容~
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
四、網絡連接配接建立的過程:
我們已經知道了如何使用OkHttp進行網絡請求,以及建立網絡請求添加了哪些攔截器等等,但是它是怎麼建立網絡連接配接的呢?這就是這部分我們要分析的内容。首先,我們已經知道OkHttp是通過ConnectInterceptor攔截器建立與伺服器之間的網絡連接配接。下面我們就通過transmitter.newExchange()方法分析一下連接配接建立的邏輯。在Transmitter中newExchange()方法源碼如下:
/** Returns a new exchange to carry a new request and response. */
Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
synchronized (connectionPool) {
if (noMoreExchanges) {
throw new IllegalStateException("released");
}
if (exchange != null) {
throw new IllegalStateException("cannot make a new request because the previous response "
+ "is still open: please call response.close()");
}
}
ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
synchronized (connectionPool) {
this.exchange = result;
this.exchangeRequestDone = false;
this.exchangeResponseDone = false;
return result;
}
}
方法中的exchangeFinder就是用于查找連接配接關系的對象,關于為什麼需要查找,這部分的邏輯需要檢視ExchangeFinder中關于查找政策的相關内容。在ExchangeFinder中的find()方法中我們看到“
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
”在這裡調用了findHealthyConnection()方法來擷取RealConnection執行個體對象,我們猜測它就是真實的網絡連接配接吧。我們繼續閱讀findHealthyConnection()方法,我們看到裡面通過死循環去查找連接配接狀況良好的RealConnection執行個體對象candidate,而關于candidate則是通過findConnection()方法查找的,下面我們就看一下findConnection()方法的源碼:
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
//需要查找的連接配接對象
RealConnection result = null;
Route selectedRoute = null;
RealConnection releasedConnection;
Socket toClose;
//擷取池中的連接配接
synchronized (connectionPool) {
if (transmitter.isCanceled()) throw new IOException("Canceled");
hasStreamFailure = false; // This is a fresh attempt.
//嘗試使用已經配置設定的連接配接;
releasedConnection = transmitter.connection;
toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
? transmitter.releaseConnectionNoEvents()
: null;
if (transmitter.connection != null) {
// We had an already-allocated connection and it's good.
//已配置設定的連接配接正常
result = transmitter.connection;
releasedConnection = null;
}
//省略部分代碼
...
}
closeQuietly(toClose);
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
//找到連接配接,事件回調
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
//擷取的連接配接已配置設定或者已經放入連接配接池了,就代表已經完成了。
return result;
}
// If we need a route selection, make one. This is a blocking operation.
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
List<Route> routes = null;
synchronized (connectionPool) {
//省略部分代碼
...
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
//重建立立連接配接
result = new RealConnection(connectionPool, selectedRoute);
connectingConnection = result;
}
}
// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
// Do TCP + TLS handshakes. This is a blocking operation.
//RealConnection 進行連接配接嘗試,是阻塞時操作;
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
connectionPool.routeDatabase.connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
connectingConnection = null;
// Last attempt at connection coalescing, which only occurs if we attempted multiple
// concurrent connections to the same host.
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
// We lost the race! Close the connection we created and return the pooled connection.
result.noNewExchanges = true;
socket = result.socket();
result = transmitter.connection;
// It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
// that case we will retry the route we just successfully connected with.
nextRouteToTry = selectedRoute;
} else {
connectionPool.put(result);
transmitter.acquireConnectionNoEvents(result);
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
上述代碼的連接配接查找或者建立政策就是:
1>如果目前的網絡連接配接呼叫已經具有滿足條件的連接配接,則使用該連接配接;
2>如果連接配接池中有一個可以滿足的連接配接,則使用該連接配接;
3>如果沒有符合條件的,則嘗試重新建立新的連接配接;
result.connect()則是進行TCP握手的操作;下面我們就看一下在Real Connection中連接配接的具體邏輯:
public void connect(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
EventListener eventListener) {
if (protocol != null) throw new IllegalStateException("already connected");
RouteException routeException = null;
List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
...
while (true) {
try {
//路由連結通道需要Http代理(HTTP proxy)
if (route.requiresTunnel()) {
//通過代理通道建立HTTPS連接配接去完成工作~
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
if (rawSocket == null) {//建立的socket對象不存在
// We were unable to connect the tunnel but properly closed down our resources.
break;
}
} else {
//通過原生的Socket建立一個完全的HTTP或者HTTPS連接配接去完成工作~
connectSocket(connectTimeout, readTimeout, call, eventListener);
}
//确定連接配接協定~
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
break;
} catch (IOException e) {
//發生異常:關閉連接配接,釋放資源...
closeQuietly(socket);
closeQuietly(rawSocket);
socket = null;
rawSocket = null;
source = null;
sink = null;
handshake = null;
protocol = null;
http2Connection = null;
eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e);
if (routeException == null) {
routeException = new RouteException(e);
} else {
routeException.addConnectException(e);
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException;
}
}
}
if (route.requiresTunnel() && rawSocket == null) {
ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: "
+ MAX_TUNNEL_ATTEMPTS);
throw new RouteException(exception);
}
//擷取目前可承載最大的連接配接數;
if (http2Connection != null) {
synchronized (connectionPool) {
allocationLimit = http2Connection.maxConcurrentStreams();
}
}
}
RealConnection執行個體對象通過調用該方法與伺服器建立連接配接,建立連接配接有兩種形式:一種是通過connectTunnel()方法建立連接配接。該方法是主要是通過connectSocket()方法建立的連接配接;另一種就是直接通過connectSocket()建立連接配接。差別是前者使用了代理通道,而後者使用了原始的socket建立的連接配接。
下面我們看一下connectSocket()方法的源碼。
/** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */
private void connectSocket(int connectTimeout, int readTimeout, Call call,
EventListener eventListener) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
//初始化Socket對象;
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
eventListener.connectStart(call, route.socketAddress(), proxy);
rawSocket.setSoTimeout(readTimeout);
try {
//使用原生Socket建立連接配接;
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
//初始化資料緩沖區;
try {
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw new IOException(npe);
}
}
}
最終我們看到OkHttp是使用原生的Socket建立的連接配接和進行資料的通信的。
>>推廣:
想要一起學習理财的程式員/程式猿們可以關注我的公衆号:久财之道。
我們是努力學習股票理财知識,緻力于提升生活品質的小散們。
