1、開篇
閱讀源碼要有目的,關于OkHttp,我們要搞清楚以下幾點:
- OkHttp網絡請求的流程;
- 攔截器的作用及執行流程(責任鍊);
- OkHttp緩存控制;
- OkHttp的連接配接複用機制。
本文基于寫作時的最新的OkHttp 4.9.1版本。
2、OkHttp的使用
OkHttp的使用步驟如下:
- 建構OkHttpClient對象,可以有兩種方式
建議采用第二種方式,可以更加靈活的進行各項配置。其中仍然有許多參數可以設定,這裡隻列出了一些最常見的。// 方式一,直接new val client = OkHttpClient() // 方式二,Builder模式 val client = OkHttpClient.Builder() // 增加攔截器,可以實作類似日志記錄、增加全局請求字段等類似的功能 //.addInterceptor() // 增加網絡攔截器 //.addNetworkInterceptor() // 設定請求逾時時間, 0表示不限時間 .callTimeout(10, TimeUnit.SECONDS) // 設定逾時時間 .connectTimeout(5, TimeUnit.SECONDS) // 設定讀逾時時間 .readTimeout(5, TimeUnit.SECONDS) // 設定寫逾時時間 .writeTimeout(5, TimeUnit.SECONDS) // 設定緩存 //.cache() .build()
- 建構請求
這裡需要注意addHeader和header兩個方法的差別。Http中,一個header的key是可以對應多個value的,addHeader方法是給相應的key增加的一個value,而header方法則是把現有的value全部清空,然後增加一個value。val request = Request.Builder() // 增加請求頭 .addHeader("key", "value") // 設定請求頭,注意和addHeader的差別 .header("key", "value") // 緩存控制 .cacheControl(CacheControl.FORCE_NETWORK) // GET請求 //.get() // POST請求 //.post("{\"key\": \"value\"}".toRequestBody("application/json".toMediaType())) // 也可以通過method來靈活指定請求方法 .method("GET", null) // 請求的URL .url("https://www.baidu.com") .build()
- 發起請求并處理請求結果。OkHttp提供了兩種請求方式:同步請求和異步請求。同步請求會在目前線程執行請求并傳回結果,是以可能會阻塞線程。異步請求會在背景線程池執行請求,并通過回調的方式回傳結果給調用者。
// 同步請求 val response = client.newCall(request).execute() // 輸出結果 println(response.body?.string() ?: "") // 異步請求 client.newCall(request).enqueue(object : Callback { override fun onFailure(call: Call, e: IOException) { } override fun onResponse(call: Call, response: Response) { println(response.body?.string() ?: "") } })
這裡不讨論WebSocket的相關用法。
2、OkHttp的請求流程
從上面我們可以看到,無論是同步請求還是異步請求,都先調用了OkHttpClinent的newcall方法:
此方法非常簡單,就是new了一個RealCall對象,同步請求的時候,執行的是RealCall#execute方法:
override fun execute(): Response {
// 一個RealCall對象不能執行兩次
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter()
callStart()
try {
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}
同步請求的方法非常簡單,從方法名可以看出getResponseWithInterceptorChain()是真正擷取請求響應的地方。
異步請求的時候執行的是RealCall#enqueue方法:
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
AsyncCall是Realcall的内部類,同時它也實作了Runnable接口,是以主要的執行邏輯肯定是在其run方法:
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
這裡我們可以看到callback回調執行的情況,同時也可以看出,它也是通過調用getResponseWithInterceptorChain()擷取請求響應的。是以同步異步隻是執行的線程不一樣,本質上是一樣的。
接下來我們來看看getResponseWithInterceptorChain()都幹了些什麼
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
這裡做的事情很簡單,把開發者設定的各種攔截器和一些内置的攔截器封裝成RealInterceptorChain,通過它擷取請求響應并傳回。注意攔截器的順序,先是client.interceptors,也就是OkHttpClient.Builder#addInterceptor設定的攔截器,然後是RetryAndFollowUpInterceptor、BridgeInterceptor、CacheInterceptor、ConnectInterceptor,再後面是client.networkInterceptors也就是OkHttpClient.Builder#addNetworkInterceptor設定的攔截器,最後是CallServerInterceptor。這些内置的攔截器基本上從命名上就可以看出它們的作用。
3、Dispatcher
可以看到無論同步還是異步,都調用了client.dispatcher的相關方法,它是okhttp3.Dispatcher的執行個體。我們知道一般叫做XXDispatcher或者Dispatcher的類都是負責分發和排程的。
再看一下同步請求的代碼
override fun execute(): Response {
// 一個RealCall對象不能執行兩次
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter()
callStart()
try {
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}
這裡先後調用了Dispatcher.executed()和Dispatcher.finished()
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
/** Used by [Call.execute] to signal completion. */
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
也就是在執行請求的時候,把這個RealCall對象添加到runningSyncCalls隊列中,在請求結束之後移除。這個runningSyncCalls的作用是記錄這個請求以便判斷目前的狀态以及友善統一取消請求,不涉及線程的排程。
再看異步請求
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
這裡調用了Dispatcher的enqueue方法
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
這裡先把這個AsyncCall添加到就緒隊列readyAsyncCalls,然後在promoteAndExecute()中檢查檢查目前是否達到最大并發請求數量和單個Host的最大并發請求數量,如果都沒有,就把asyncCall添加到執行隊列runningAsyncCalls中,最後調用了asyncCall.executeOn(executorService),線上程池中執行這個請求。
執行完成後,會從runningAsyncCalls中移除這個AsyncCall。
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
...
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
// 從執行隊列移除
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
// 這個方法上面貼過了,節省篇幅,省略中間的代碼
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
...
} finally {
// 從執行隊列移除
client.dispatcher.finished(this)
}
}
}
}
也就是通過ExecutorService來執行請求。我們可以通過OkHttpClient.Builder#dispatcher()設定自定義的Dispatcher和ExecutorService。
預設的ExecutorService如下
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
4、攔截器責任鍊的執行
再回頭看getResponseWithInterceptorChain()
internal fun getResponseWithInterceptorChain(): Response {
...
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
...
return response
} catch (e: IOException) {
...
} finally {
...
}
}
這裡調用了RealInterceptorChain#proceed
@Throws(IOException::class)
override fun proceed(request: Request): Response {
...
// Call the next interceptor in the chain.
val next = copy(index = index + 1, request = request)
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
if (exchange != null) {
check(index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
}
check(response.body != null) { "interceptor $interceptor returned a response with no body" }
return response
}
這裡主要是調用了第index個攔截器的intercept方法,index是RealInterceptorChain對象構造時的傳進來的。注意到intercept是RealInterceptorChain對象,隻是傳入的index是目前index+1。
各個Interceptor的intercept方法如果能處理目前的請求,那麼則傳回處理結果對應的Response對象,否則應該調用入參的proceed方法,繼續在責任鍊中傳遞。這就是請求責任鍊的執行過程。
如果開發者沒有設定自定義的攔截器,那麼責任鍊中的順序是RetryAndFollowUpInterceptor -> BridgeInterceptor -> CacheInterceptor -> ConnectInterceptor -> CallServerInterceptor,接下來我們一個個分析這些攔截器的作用。
4.1 RetryAndFollowUpInterceptor
RetryAndFollowUpInterceptor負責重試和重定向的處理
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf<IOException>()
while (true) {
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
var response: Response
var closeActiveExchange = true
try {
// 如果請求已經取消了,直接抛出異常
if (call.isCanceled()) {
throw IOException("Canceled")
}
try {
// 繼續在責任鍊中傳遞,擷取response
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) {
...
continue
} catch (e: IOException) {
...
continue
}
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}
val exchange = call.interceptorScopedExchange
// 處理重定向,生成新的Request
val followUp = followUpRequest(response, exchange)
// 如果沒有新的Request生成,那麼這個就是最終結果,直接傳回
if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}
val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}
response.body?.closeQuietly()
// 重定向超過最大限制次數,抛出異常
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}
@Throws(IOException::class)
private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? {
val route = exchange?.connection?.route()
val responseCode = userResponse.code
val method = userResponse.request.method
// 通過狀态碼來判斷是否重定向
when (responseCode) {
HTTP_PROXY_AUTH -> {
val selectedProxy = route!!.proxy
if (selectedProxy.type() != Proxy.Type.HTTP) {
throw ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy")
}
return client.proxyAuthenticator.authenticate(route, userResponse)
}
HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse)
HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {
return buildRedirectRequest(userResponse, method)
}
HTTP_CLIENT_TIMEOUT -> {
// 408's are rare in practice, but some servers like HAProxy use this response code. The
// spec says that we may repeat the request without modifications. Modern browsers also
// repeat the request (even non-idempotent ones.)
if (!client.retryOnConnectionFailure) {
// The application layer has directed us not to retry the request.
return null
}
val requestBody = userResponse.request.body
if (requestBody != null && requestBody.isOneShot()) {
return null
}
val priorResponse = userResponse.priorResponse
if (priorResponse != null && priorResponse.code == HTTP_CLIENT_TIMEOUT) {
// We attempted to retry and got another timeout. Give up.
return null
}
if (retryAfter(userResponse, 0) > 0) {
return null
}
return userResponse.request
}
HTTP_UNAVAILABLE -> {
val priorResponse = userResponse.priorResponse
if (priorResponse != null && priorResponse.code == HTTP_UNAVAILABLE) {
// We attempted to retry and got another timeout. Give up.
return null
}
if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {
// specifically received an instruction to retry without delay
return userResponse.request
}
return null
}
HTTP_MISDIRECTED_REQUEST -> {
// OkHttp can coalesce HTTP/2 connections even if the domain names are different. See
// RealConnection.isEligible(). If we attempted this and the server returned HTTP 421, then
// we can retry on a different connection.
val requestBody = userResponse.request.body
if (requestBody != null && requestBody.isOneShot()) {
return null
}
if (exchange == null || !exchange.isCoalescedConnection) {
return null
}
exchange.connection.noCoalescedConnections()
return userResponse.request
}
else -> return null
}
}
這是一個死循環,除非抛出異常或者得到最終的Response才會傳回。請求逾時會通過抛出異常的方式退出循環。RetryAndFollowUpInterceptor沒有直接處理請求,而是對責任鍊傳回的Response進行了重試、狀态碼判斷和重定向處理。
4.2 BridgeInterceptor
/**
* Bridges from application code to network code. First it builds a network request from a user
* request. Then it proceeds to call the network. Finally it builds a user response from the network
* response.
*/
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
val body = userRequest.body
if (body != null) {
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}
val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
val networkResponse = chain.proceed(requestBuilder.build())
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
}
/** Returns a 'Cookie' HTTP request header with all cookies, like `a=b; c=d`. */
private fun cookieHeader(cookies: List<Cookie>): String = buildString {
cookies.forEachIndexed { index, cookie ->
if (index > 0) append("; ")
append(cookie.name).append('=').append(cookie.value)
}
}
}
可以看到,BridgeInterceptor也沒有直接處理請求,而是把開發者的Request對象進行了重新包裝,加上了“Content-Length”等HTTP标準的請求頭資訊。同時BridgeInterceptor也對傳回結果進行了相應的處理。
4.3 CacheInterceptor
CacheInterceptor從類名就可以看出是處理緩存的攔截器
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val call = chain.call()
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
// 計算緩存政策
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse
cache?.trackResponse(strategy)
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE
// cacheCandidate不為空但是cacheResponse,緩存的結果已經無效
if (cacheCandidate != null && cacheResponse == null) {
// The cache candidate wasn't applicable. Close it.
cacheCandidate.body?.closeQuietly()
}
// 如果請求政策是隻檢索緩存而不允許進行網絡請求,但是本地沒有相應的緩存,則請求失敗,傳回失敗的Response
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build().also {
listener.satisfactionFailure(call, it)
}
}
// 如果不需要網絡請求,直接傳回緩存的結果
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build().also {
listener.cacheHit(call, it)
}
}
if (cacheResponse != null) {
listener.cacheConditionalHit(call, cacheResponse)
} else if (cache != null) { // 緩存沒有命中
listener.cacheMiss(call)
}
var networkResponse: Response? = null
try {
// 真正處理網絡請求的地方
networkResponse = chain.proceed(networkRequest)
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
// HTTP狀态碼為HTTP_NOT_MODIFIED(304)
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response.also {
listener.cacheHit(call, it)
}
} else {
cacheResponse.body?.closeQuietly()
}
}
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
// 處理緩存
if (cache != null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
if (cacheResponse != null) {
// This will log a conditional cache miss only.
listener.cacheMiss(call)
}
}
}
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}
return response
}
這裡邏輯非常清晰,根據緩存政策和本地緩存的有效性來決定是否請求網絡。
- 如果請求政策是隻檢索緩存而不允許進行網絡請求,但是本地沒有相應的有效緩存,則請求失敗。
- 如果本地有緩存且緩存政策規定不需要網絡請求,直接傳回緩存的結果。
- 如果HTTP狀态碼是304,那麼說明本地緩存就是有效的響應資料,直接包裝成Response傳回。
- 責任鍊繼續處理,并把本地緩存(如有)包裝到cacheResponse字段,根據緩存政策決定是否更新本地緩存,傳回結果。
接着我們跟進看看這個緩存的政策是怎麼計算的,也就是CacheStrategy.Factory#compute(),此外注意CacheStrategy.Factory的三個構造參數,目前時間、目前請求對象以及本地緩存結果,而CacheStrategy有兩個構造參數,一是網絡請求對象,而是緩存結果對象。
fun compute(): CacheStrategy {
val candidate = computeCandidate()
// 本地沒有緩存,但是request中指定隻檢索緩存,那麼可以預測最終結果是請求失敗
if (candidate.networkRequest != null && request.cacheControl.onlyIfCached) {
return CacheStrategy(null, null)
}
return candidate
}
private fun computeCandidate(): CacheStrategy {
// 本地沒有緩存
if (cacheResponse == null) {
return CacheStrategy(request, null)
}
// handshake為空視為緩存無效
if (request.isHttps && cacheResponse.handshake == null) {
return CacheStrategy(request, null)
}
// 如果該請求不應該存在本地緩存,那麼本地緩存是無效的。如果多次同樣的請求緩存政策是一緻的,那麼這個檢查就沒有必要。
if (!isCacheable(cacheResponse, request)) {
return CacheStrategy(request, null)
}
// 根據調用者給request設定的緩存政策和請求頭資訊(If-Modified-Since、If-None-Match)判斷是否使用本地緩存
val requestCaching = request.cacheControl
if (requestCaching.noCache || hasConditions(request)) {
return CacheStrategy(request, null)
}
val responseCaching = cacheResponse.cacheControl
val ageMillis = cacheResponseAge()
var freshMillis = computeFreshnessLifetime()
if (requestCaching.maxAgeSeconds != -1) {
freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong()))
}
var minFreshMillis: Long = 0
if (requestCaching.minFreshSeconds != -1) {
minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong())
}
var maxStaleMillis: Long = 0
if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1) {
maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong())
}
// 緩存足夠fresh,不需要網絡請求,隻是用本地緩存
if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
val builder = cacheResponse.newBuilder()
if (ageMillis + minFreshMillis >= freshMillis) {
builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"")
}
val oneDayMillis = 24 * 60 * 60 * 1000L
if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"")
}
return CacheStrategy(null, builder.build())
}
// Find a condition to add to the request. If the condition is satisfied, the response body
// will not be transmitted.
val conditionName: String
val conditionValue: String?
when {
etag != null -> {
conditionName = "If-None-Match"
conditionValue = etag
}
lastModified != null -> {
conditionName = "If-Modified-Since"
conditionValue = lastModifiedString
}
servedDate != null -> {
conditionName = "If-Modified-Since"
conditionValue = servedDateString
}
else -> return CacheStrategy(request, null) // No condition! Make a regular request.
}
val conditionalRequestHeaders = request.headers.newBuilder()
conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)
val conditionalRequest = request.newBuilder()
.headers(conditionalRequestHeaders.build())
.build()
return CacheStrategy(conditionalRequest, cacheResponse)
}
由此我們可以看到,緩存是由request中cacheControl和HTTP協定本身共同控制的。需要注意的是,OkHttpClient.Builder中的cache預設是null,也就是說預設是不啟用緩存的。你也可以通過以下方式開啟緩存
val client = OkHttpClient.Builder()
.cache(Cache(File("your cache path"), 10 * 1024 * 1024))
.build()
OkHttp的Cache是LRU Cache,也就是最近最少使用算法實作緩存,具體的實作類是okhttp3.internal.cache.DiskLruCache。
4.4 ConnectInterceptor
ConnectInterceptor,連接配接攔截器,連接配接複用就在這裡。ConnectInterceptor類的代碼尤為簡單
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
}
主要是調用了Realcall#initExchange方法,來看看這個方法做了什麼
internal fun initExchange(chain: RealInterceptorChain): Exchange {
synchronized(this) {
check(expectMoreExchanges) { "released" }
check(!responseBodyOpen)
check(!requestBodyOpen)
}
val exchangeFinder = this.exchangeFinder!!
val codec = exchangeFinder.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder, codec)
this.interceptorScopedExchange = result
this.exchange = result
synchronized(this) {
this.requestBodyOpen = true
this.responseBodyOpen = true
}
if (canceled) throw IOException("Canceled")
return result
}
主要是做了一些檢查,然後通過exchangeFinder擷取到一個ExchangeCodec對象codec,然後new了一個Exchange對象傳回。
先看.ExchangeFinder#find
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
return resultConnection.newCodec(client, chain)
} catch ...
}
@Throws(IOException::class)
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
while (true) {
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
// Confirm that the connection is good.
if (candidate.isHealthy(doExtensiveHealthChecks)) {
return candidate
}
// If it isn't, take it out of the pool.
candidate.noNewExchanges()
// Make sure we have some routes left to try. One example where we may exhaust all the routes
// would happen if we made a new connection and it immediately is detected as unhealthy.
if (nextRouteToTry != null) continue
val routesLeft = routeSelection?.hasNext() ?: true
if (routesLeft) continue
val routesSelectionLeft = routeSelector?.hasNext() ?: true
if (routesSelectionLeft) continue
throw IOException("exhausted all routes")
}
}
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*
* This checks for cancellation before each blocking operation.
*/
@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
// 請求已經取消了,直接抛出異常
if (call.isCanceled()) throw IOException("Canceled")
// 先檢查call本身是否已經被配置設定了連接配接
val callConnection = call.connection
if (callConnection != null) {
var toClose: Socket? = null
synchronized(callConnection) {
if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
toClose = call.releaseConnectionNoEvents()
}
}
// 如果已經配置設定連接配接,重用這個連接配接
if (call.connection != null) {
check(toClose == null)
return callConnection
}
// 連接配接已經釋放了
toClose?.closeQuietly()
eventListener.connectionReleased(call, callConnection)
}
// We need a new connection. Give it fresh stats.
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
// 從連接配接池擷取一個連接配接
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
// 連接配接池沒有符合條件的連接配接,那麼需要建立新的連接配接
// 先查找對應的路由
val routes: List<Route>?
val route: Route
if (nextRouteToTry != null) {
// Use a route from a preceding coalesced connection.
routes = null
route = nextRouteToTry!!
nextRouteToTry = null
} else if (routeSelection != null && routeSelection!!.hasNext()) {
// Use a route from an existing route selection.
routes = null
route = routeSelection!!.next()
} else {
// Compute a new route selection. This is a blocking operation!
var localRouteSelector = routeSelector
if (localRouteSelector == null) {
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
this.routeSelector = localRouteSelector
}
val localRouteSelection = localRouteSelector.next()
routeSelection = localRouteSelection
routes = localRouteSelection.routes
if (call.isCanceled()) throw IOException("Canceled")
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. We have a better chance of matching thanks to connection coalescing.
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
route = localRouteSelection.next()
}
// 建立連接配接
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
call.client.routeDatabase.connected(newConnection.route())
// If we raced another call connecting to this host, coalesce the connections. This makes for 3
// different lookups in the connection pool!
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
val result = call.connection!!
nextRouteToTry = route
newConnection.socket().closeQuietly()
eventListener.connectionAcquired(call, result)
return result
}
synchronized(newConnection) {
connectionPool.put(newConnection)
call.acquireConnectionNoEvents(newConnection)
}
eventListener.connectionAcquired(call, newConnection)
return newConnection
}
4.5 CallServerInterceptor
CallServerInterceptor才是真正向伺服器請求資料的地方,事實上HTTP協定通信的本質也不過是通過Socket進行資料的收發,而HTTP本身有它規定的格式,雙方遵循協定進行資料解析和資料發送就完成了HTTP請求。這裡很多細節我們在App開發中不需要全部了解,是以這裡也不深入去論述了。
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()
exchange.writeRequestHeaders(request)
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
exchange.noRequestBody()
if (!exchange.connection.isMultiplexed) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}
if (requestBody == null || !requestBody.isDuplex()) {
exchange.finishRequest()
}
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false
}
}
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (code == 100) {
// Server sent a 100-continue even though we did not request one. Try again to read the actual
// response status.
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
}
response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}
exchange.responseHeadersEnd(response)
response = if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
throw ProtocolException(
"HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
}
return response
}
}
5、總結
OkHttp使用了責任鍊設計模式來處理網絡請求,實際上真正會處理請求的其實隻有CacheInterceptor和CallServerInterceptor。OkHttp本身預設不開啟緩存,但是提供了預設的檔案緩存實作DiskLruCache。另外我們可以實作自定義的攔截器去實作一些定制化的功能,通過OkHttpClient.Builder的addInterceptor和addNetworkInterceptor添加到責任鍊的不同位置。