天天看點

Android網絡請求架構OkHttp源碼分析

1、開篇

閱讀源碼要有目的,關于OkHttp,我們要搞清楚以下幾點:

  1. OkHttp網絡請求的流程;
  2. 攔截器的作用及執行流程(責任鍊);
  3. OkHttp緩存控制;
  4. OkHttp的連接配接複用機制。

本文基于寫作時的最新的OkHttp 4.9.1版本。

2、OkHttp的使用

OkHttp的使用步驟如下:

  1. 建構OkHttpClient對象,可以有兩種方式
    // 方式一,直接new
    val client = OkHttpClient()
    
    // 方式二,Builder模式
    val client = OkHttpClient.Builder()
        // 增加攔截器,可以實作類似日志記錄、增加全局請求字段等類似的功能
        //.addInterceptor()
        // 增加網絡攔截器
        //.addNetworkInterceptor()
        // 設定請求逾時時間, 0表示不限時間
        .callTimeout(10, TimeUnit.SECONDS)
        // 設定逾時時間
        .connectTimeout(5, TimeUnit.SECONDS)
        // 設定讀逾時時間
        .readTimeout(5, TimeUnit.SECONDS)
        // 設定寫逾時時間
        .writeTimeout(5, TimeUnit.SECONDS)
        // 設定緩存
        //.cache()
        .build()
               
    建議采用第二種方式,可以更加靈活的進行各項配置。其中仍然有許多參數可以設定,這裡隻列出了一些最常見的。
  2. 建構請求
    val request = Request.Builder()
        // 增加請求頭
        .addHeader("key", "value")
        // 設定請求頭,注意和addHeader的差別
        .header("key", "value")
        // 緩存控制
        .cacheControl(CacheControl.FORCE_NETWORK)
        // GET請求
        //.get()
        // POST請求
        //.post("{\"key\": \"value\"}".toRequestBody("application/json".toMediaType()))
        // 也可以通過method來靈活指定請求方法
        .method("GET", null)
        // 請求的URL
        .url("https://www.baidu.com")
        .build()
               
    這裡需要注意addHeader和header兩個方法的差別。Http中,一個header的key是可以對應多個value的,addHeader方法是給相應的key增加的一個value,而header方法則是把現有的value全部清空,然後增加一個value。
  3. 發起請求并處理請求結果。OkHttp提供了兩種請求方式:同步請求和異步請求。同步請求會在目前線程執行請求并傳回結果,是以可能會阻塞線程。異步請求會在背景線程池執行請求,并通過回調的方式回傳結果給調用者。
    // 同步請求
    val response = client.newCall(request).execute()
    // 輸出結果
    println(response.body?.string() ?: "")
    
    
    // 異步請求
    client.newCall(request).enqueue(object : Callback {
        override fun onFailure(call: Call, e: IOException) {
            
        }
    
        override fun onResponse(call: Call, response: Response) {
            println(response.body?.string() ?: "")
        }
    })
               
這裡不讨論WebSocket的相關用法。

2、OkHttp的請求流程

從上面我們可以看到,無論是同步請求還是異步請求,都先調用了OkHttpClinent的newcall方法:

此方法非常簡單,就是new了一個RealCall對象,同步請求的時候,執行的是RealCall#execute方法:

override fun execute(): Response {
    // 一個RealCall對象不能執行兩次
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    timeout.enter()
    callStart()
    try {
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)
    }
}
           

同步請求的方法非常簡單,從方法名可以看出getResponseWithInterceptorChain()是真正擷取請求響應的地方。

異步請求的時候執行的是RealCall#enqueue方法:

override fun enqueue(responseCallback: Callback) {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
}
           

AsyncCall是Realcall的内部類,同時它也實作了Runnable接口,是以主要的執行邏輯肯定是在其run方法:

override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if (!signalledCallback) {
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          client.dispatcher.finished(this)
        }
      }
}
           

這裡我們可以看到callback回調執行的情況,同時也可以看出,它也是通過調用getResponseWithInterceptorChain()擷取請求響應的。是以同步異步隻是執行的線程不一樣,本質上是一樣的。

接下來我們來看看getResponseWithInterceptorChain()都幹了些什麼

internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)

    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
}
           

這裡做的事情很簡單,把開發者設定的各種攔截器和一些内置的攔截器封裝成RealInterceptorChain,通過它擷取請求響應并傳回。注意攔截器的順序,先是client.interceptors,也就是OkHttpClient.Builder#addInterceptor設定的攔截器,然後是RetryAndFollowUpInterceptor、BridgeInterceptor、CacheInterceptor、ConnectInterceptor,再後面是client.networkInterceptors也就是OkHttpClient.Builder#addNetworkInterceptor設定的攔截器,最後是CallServerInterceptor。這些内置的攔截器基本上從命名上就可以看出它們的作用。

3、Dispatcher

可以看到無論同步還是異步,都調用了client.dispatcher的相關方法,它是okhttp3.Dispatcher的執行個體。我們知道一般叫做XXDispatcher或者Dispatcher的類都是負責分發和排程的。

再看一下同步請求的代碼

override fun execute(): Response {
    // 一個RealCall對象不能執行兩次
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    timeout.enter()
    callStart()
    try {
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)
    }
}
           

這裡先後調用了Dispatcher.executed()和Dispatcher.finished()

@Synchronized internal fun executed(call: RealCall) {
    runningSyncCalls.add(call)
}

/** Used by [Call.execute] to signal completion. */
internal fun finished(call: RealCall) {
    finished(runningSyncCalls, call)
}

private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
      if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }

    val isRunning = promoteAndExecute()

    if (!isRunning && idleCallback != null) {
      idleCallback.run()
    }
}
           

也就是在執行請求的時候,把這個RealCall對象添加到runningSyncCalls隊列中,在請求結束之後移除。這個runningSyncCalls的作用是記錄這個請求以便判斷目前的狀态以及友善統一取消請求,不涉及線程的排程。

再看異步請求

override fun enqueue(responseCallback: Callback) {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
}
           

這裡調用了Dispatcher的enqueue方法

internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
}

private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
}
           

這裡先把這個AsyncCall添加到就緒隊列readyAsyncCalls,然後在promoteAndExecute()中檢查檢查目前是否達到最大并發請求數量和單個Host的最大并發請求數量,如果都沒有,就把asyncCall添加到執行隊列runningAsyncCalls中,最後調用了asyncCall.executeOn(executorService),線上程池中執行這個請求。

執行完成後,會從runningAsyncCalls中移除這個AsyncCall。

internal inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable {

    ...

    fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()

      var success = false
      try {
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          // 從執行隊列移除 
          client.dispatcher.finished(this) // This call is no longer running!
        }
      }
    }

    // 這個方法上面貼過了,節省篇幅,省略中間的代碼
    override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          ...
        } finally {
          // 從執行隊列移除 
          client.dispatcher.finished(this)
        }
      }
    }
}
           

也就是通過ExecutorService來執行請求。我們可以通過OkHttpClient.Builder#dispatcher()設定自定義的Dispatcher和ExecutorService。

預設的ExecutorService如下

@get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
      }
      return executorServiceOrNull!!
}
           

4、攔截器責任鍊的執行

再回頭看getResponseWithInterceptorChain()

internal fun getResponseWithInterceptorChain(): Response {
    ...

    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      ...
      return response
    } catch (e: IOException) {
      ...
    } finally {
      ...
    }
}
           

這裡調用了RealInterceptorChain#proceed

@Throws(IOException::class)
  override fun proceed(request: Request): Response {
    ...

    // Call the next interceptor in the chain.
    val next = copy(index = index + 1, request = request)
    val interceptor = interceptors[index]

    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    if (exchange != null) {
      check(index + 1 >= interceptors.size || next.calls == 1) {
        "network interceptor $interceptor must call proceed() exactly once"
      }
    }

    check(response.body != null) { "interceptor $interceptor returned a response with no body" }

    return response
}
           

這裡主要是調用了第index個攔截器的intercept方法,index是RealInterceptorChain對象構造時的傳進來的。注意到intercept是RealInterceptorChain對象,隻是傳入的index是目前index+1。

各個Interceptor的intercept方法如果能處理目前的請求,那麼則傳回處理結果對應的Response對象,否則應該調用入參的proceed方法,繼續在責任鍊中傳遞。這就是請求責任鍊的執行過程。

如果開發者沒有設定自定義的攔截器,那麼責任鍊中的順序是RetryAndFollowUpInterceptor -> BridgeInterceptor -> CacheInterceptor -> ConnectInterceptor -> CallServerInterceptor,接下來我們一個個分析這些攔截器的作用。

4.1 RetryAndFollowUpInterceptor

RetryAndFollowUpInterceptor負責重試和重定向的處理

@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    var request = chain.request
    val call = realChain.call
    var followUpCount = 0
    var priorResponse: Response? = null
    var newExchangeFinder = true
    var recoveredFailures = listOf<IOException>()
    while (true) {
      call.enterNetworkInterceptorExchange(request, newExchangeFinder)

      var response: Response
      var closeActiveExchange = true
      try {
        // 如果請求已經取消了,直接抛出異常
        if (call.isCanceled()) {
          throw IOException("Canceled")
        }

        try {
          // 繼續在責任鍊中傳遞,擷取response
          response = realChain.proceed(request)
          newExchangeFinder = true
        } catch (e: RouteException) {
          ...
          continue
        } catch (e: IOException) {
          ...
          continue
        }

        // Attach the prior response if it exists. Such responses never have a body.
        if (priorResponse != null) {
          response = response.newBuilder()
              .priorResponse(priorResponse.newBuilder()
                  .body(null)
                  .build())
              .build()
        }

        val exchange = call.interceptorScopedExchange
        // 處理重定向,生成新的Request
        val followUp = followUpRequest(response, exchange)

        // 如果沒有新的Request生成,那麼這個就是最終結果,直接傳回
        if (followUp == null) {
          if (exchange != null && exchange.isDuplex) {
            call.timeoutEarlyExit()
          }
          closeActiveExchange = false
          return response
        }

        val followUpBody = followUp.body
        if (followUpBody != null && followUpBody.isOneShot()) {
          closeActiveExchange = false
          return response
        }

        response.body?.closeQuietly()

        // 重定向超過最大限制次數,抛出異常
        if (++followUpCount > MAX_FOLLOW_UPS) {
          throw ProtocolException("Too many follow-up requests: $followUpCount")
        }

        request = followUp
        priorResponse = response
      } finally {
        call.exitNetworkInterceptorExchange(closeActiveExchange)
      }
    }
}

@Throws(IOException::class)
private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? {
    val route = exchange?.connection?.route()
    val responseCode = userResponse.code

    val method = userResponse.request.method
    // 通過狀态碼來判斷是否重定向
    when (responseCode) {
      HTTP_PROXY_AUTH -> {
        val selectedProxy = route!!.proxy
        if (selectedProxy.type() != Proxy.Type.HTTP) {
          throw ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy")
        }
        return client.proxyAuthenticator.authenticate(route, userResponse)
      }

      HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse)

      HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {
        return buildRedirectRequest(userResponse, method)
      }

      HTTP_CLIENT_TIMEOUT -> {
        // 408's are rare in practice, but some servers like HAProxy use this response code. The
        // spec says that we may repeat the request without modifications. Modern browsers also
        // repeat the request (even non-idempotent ones.)
        if (!client.retryOnConnectionFailure) {
          // The application layer has directed us not to retry the request.
          return null
        }

        val requestBody = userResponse.request.body
        if (requestBody != null && requestBody.isOneShot()) {
          return null
        }
        val priorResponse = userResponse.priorResponse
        if (priorResponse != null && priorResponse.code == HTTP_CLIENT_TIMEOUT) {
          // We attempted to retry and got another timeout. Give up.
          return null
        }

        if (retryAfter(userResponse, 0) > 0) {
          return null
        }

        return userResponse.request
      }

      HTTP_UNAVAILABLE -> {
        val priorResponse = userResponse.priorResponse
        if (priorResponse != null && priorResponse.code == HTTP_UNAVAILABLE) {
          // We attempted to retry and got another timeout. Give up.
          return null
        }

        if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {
          // specifically received an instruction to retry without delay
          return userResponse.request
        }

        return null
      }

      HTTP_MISDIRECTED_REQUEST -> {
        // OkHttp can coalesce HTTP/2 connections even if the domain names are different. See
        // RealConnection.isEligible(). If we attempted this and the server returned HTTP 421, then
        // we can retry on a different connection.
        val requestBody = userResponse.request.body
        if (requestBody != null && requestBody.isOneShot()) {
          return null
        }

        if (exchange == null || !exchange.isCoalescedConnection) {
          return null
        }

        exchange.connection.noCoalescedConnections()
        return userResponse.request
      }

      else -> return null
    }
}
           

這是一個死循環,除非抛出異常或者得到最終的Response才會傳回。請求逾時會通過抛出異常的方式退出循環。RetryAndFollowUpInterceptor沒有直接處理請求,而是對責任鍊傳回的Response進行了重試、狀态碼判斷和重定向處理。

4.2 BridgeInterceptor

/**
 * Bridges from application code to network code. First it builds a network request from a user
 * request. Then it proceeds to call the network. Finally it builds a user response from the network
 * response.
 */
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val userRequest = chain.request()
    val requestBuilder = userRequest.newBuilder()

    val body = userRequest.body
    if (body != null) {
      val contentType = body.contentType()
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString())
      }

      val contentLength = body.contentLength()
      if (contentLength != -1L) {
        requestBuilder.header("Content-Length", contentLength.toString())
        requestBuilder.removeHeader("Transfer-Encoding")
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked")
        requestBuilder.removeHeader("Content-Length")
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", userRequest.url.toHostHeader())
    }

    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive")
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    var transparentGzip = false
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true
      requestBuilder.header("Accept-Encoding", "gzip")
    }

    val cookies = cookieJar.loadForRequest(userRequest.url)
    if (cookies.isNotEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies))
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", userAgent)
    }

    val networkResponse = chain.proceed(requestBuilder.build())

    cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)

    val responseBuilder = networkResponse.newBuilder()
        .request(userRequest)

    if (transparentGzip &&
        "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
        networkResponse.promisesBody()) {
      val responseBody = networkResponse.body
      if (responseBody != null) {
        val gzipSource = GzipSource(responseBody.source())
        val strippedHeaders = networkResponse.headers.newBuilder()
            .removeAll("Content-Encoding")
            .removeAll("Content-Length")
            .build()
        responseBuilder.headers(strippedHeaders)
        val contentType = networkResponse.header("Content-Type")
        responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
      }
    }

    return responseBuilder.build()
  }

  /** Returns a 'Cookie' HTTP request header with all cookies, like `a=b; c=d`. */
  private fun cookieHeader(cookies: List<Cookie>): String = buildString {
    cookies.forEachIndexed { index, cookie ->
      if (index > 0) append("; ")
      append(cookie.name).append('=').append(cookie.value)
    }
  }
}
           

可以看到,BridgeInterceptor也沒有直接處理請求,而是把開發者的Request對象進行了重新包裝,加上了“Content-Length”等HTTP标準的請求頭資訊。同時BridgeInterceptor也對傳回結果進行了相應的處理。

4.3 CacheInterceptor

CacheInterceptor從類名就可以看出是處理緩存的攔截器

@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
    val call = chain.call()
    val cacheCandidate = cache?.get(chain.request())

    val now = System.currentTimeMillis()

    // 計算緩存政策
    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
    val networkRequest = strategy.networkRequest
    val cacheResponse = strategy.cacheResponse

    cache?.trackResponse(strategy)
    val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE

    // cacheCandidate不為空但是cacheResponse,緩存的結果已經無效
    if (cacheCandidate != null && cacheResponse == null) {
      // The cache candidate wasn't applicable. Close it.
      cacheCandidate.body?.closeQuietly()
    }

    // 如果請求政策是隻檢索緩存而不允許進行網絡請求,但是本地沒有相應的緩存,則請求失敗,傳回失敗的Response
    if (networkRequest == null && cacheResponse == null) {
      return Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(HTTP_GATEWAY_TIMEOUT)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build().also {
            listener.satisfactionFailure(call, it)
          }
    }

    // 如果不需要網絡請求,直接傳回緩存的結果
    if (networkRequest == null) {
      return cacheResponse!!.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build().also {
            listener.cacheHit(call, it)
          }
    }

    if (cacheResponse != null) { 
      listener.cacheConditionalHit(call, cacheResponse)
    } else if (cache != null) { // 緩存沒有命中
      listener.cacheMiss(call)
    }

    var networkResponse: Response? = null
    try {
      // 真正處理網絡請求的地方
      networkResponse = chain.proceed(networkRequest)
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        cacheCandidate.body?.closeQuietly()
      }
    }

    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
      // HTTP狀态碼為HTTP_NOT_MODIFIED(304)
      if (networkResponse?.code == HTTP_NOT_MODIFIED) {
        val response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers, networkResponse.headers))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis)
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build()

        networkResponse.body!!.close()

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache!!.trackConditionalCacheHit()
        cache.update(cacheResponse, response)
        return response.also {
          listener.cacheHit(call, it)
        }
      } else {
        cacheResponse.body?.closeQuietly()
      }
    }

    val response = networkResponse!!.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build()

    // 處理緩存
    if (cache != null) {
      if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        val cacheRequest = cache.put(response)
        return cacheWritingResponse(cacheRequest, response).also {
          if (cacheResponse != null) {
            // This will log a conditional cache miss only.
            listener.cacheMiss(call)
          }
        }
      }

      if (HttpMethod.invalidatesCache(networkRequest.method)) {
        try {
          cache.remove(networkRequest)
        } catch (_: IOException) {
          // The cache cannot be written.
        }
      }
    }

    return response
}
           

這裡邏輯非常清晰,根據緩存政策和本地緩存的有效性來決定是否請求網絡。

  1. 如果請求政策是隻檢索緩存而不允許進行網絡請求,但是本地沒有相應的有效緩存,則請求失敗。
  2. 如果本地有緩存且緩存政策規定不需要網絡請求,直接傳回緩存的結果。
  3. 如果HTTP狀态碼是304,那麼說明本地緩存就是有效的響應資料,直接包裝成Response傳回。
  4. 責任鍊繼續處理,并把本地緩存(如有)包裝到cacheResponse字段,根據緩存政策決定是否更新本地緩存,傳回結果。

接着我們跟進看看這個緩存的政策是怎麼計算的,也就是CacheStrategy.Factory#compute(),此外注意CacheStrategy.Factory的三個構造參數,目前時間、目前請求對象以及本地緩存結果,而CacheStrategy有兩個構造參數,一是網絡請求對象,而是緩存結果對象。

fun compute(): CacheStrategy {
      val candidate = computeCandidate()

      // 本地沒有緩存,但是request中指定隻檢索緩存,那麼可以預測最終結果是請求失敗
      if (candidate.networkRequest != null && request.cacheControl.onlyIfCached) {
        return CacheStrategy(null, null)
      }

      return candidate
}

private fun computeCandidate(): CacheStrategy {
      // 本地沒有緩存
      if (cacheResponse == null) {
        return CacheStrategy(request, null)
      }

      // handshake為空視為緩存無效
      if (request.isHttps && cacheResponse.handshake == null) {
        return CacheStrategy(request, null)
      }

      // 如果該請求不應該存在本地緩存,那麼本地緩存是無效的。如果多次同樣的請求緩存政策是一緻的,那麼這個檢查就沒有必要。
      if (!isCacheable(cacheResponse, request)) {
        return CacheStrategy(request, null)
      }

      // 根據調用者給request設定的緩存政策和請求頭資訊(If-Modified-Since、If-None-Match)判斷是否使用本地緩存
      val requestCaching = request.cacheControl
      if (requestCaching.noCache || hasConditions(request)) {
        return CacheStrategy(request, null)
      }

      val responseCaching = cacheResponse.cacheControl

      val ageMillis = cacheResponseAge()
      var freshMillis = computeFreshnessLifetime()

      if (requestCaching.maxAgeSeconds != -1) {
        freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong()))
      }

      var minFreshMillis: Long = 0
      if (requestCaching.minFreshSeconds != -1) {
        minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong())
      }

      var maxStaleMillis: Long = 0
      if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1) {
        maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong())
      }

      // 緩存足夠fresh,不需要網絡請求,隻是用本地緩存
      if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
        val builder = cacheResponse.newBuilder()
        if (ageMillis + minFreshMillis >= freshMillis) {
          builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"")
        }
        val oneDayMillis = 24 * 60 * 60 * 1000L
        if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
          builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"")
        }
        return CacheStrategy(null, builder.build())
      }

      // Find a condition to add to the request. If the condition is satisfied, the response body
      // will not be transmitted.
      val conditionName: String
      val conditionValue: String?
      when {
        etag != null -> {
          conditionName = "If-None-Match"
          conditionValue = etag
        }

        lastModified != null -> {
          conditionName = "If-Modified-Since"
          conditionValue = lastModifiedString
        }

        servedDate != null -> {
          conditionName = "If-Modified-Since"
          conditionValue = servedDateString
        }

        else -> return CacheStrategy(request, null) // No condition! Make a regular request.
      }

      val conditionalRequestHeaders = request.headers.newBuilder()
      conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)

      val conditionalRequest = request.newBuilder()
          .headers(conditionalRequestHeaders.build())
          .build()
      return CacheStrategy(conditionalRequest, cacheResponse)
}
           

由此我們可以看到,緩存是由request中cacheControl和HTTP協定本身共同控制的。需要注意的是,OkHttpClient.Builder中的cache預設是null,也就是說預設是不啟用緩存的。你也可以通過以下方式開啟緩存

val client = OkHttpClient.Builder()
    .cache(Cache(File("your cache path"), 10 * 1024 * 1024))
    .build()
           

OkHttp的Cache是LRU Cache,也就是最近最少使用算法實作緩存,具體的實作類是okhttp3.internal.cache.DiskLruCache。

4.4 ConnectInterceptor

ConnectInterceptor,連接配接攔截器,連接配接複用就在這裡。ConnectInterceptor類的代碼尤為簡單

object ConnectInterceptor : Interceptor {
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.call.initExchange(chain)
    val connectedChain = realChain.copy(exchange = exchange)
    return connectedChain.proceed(realChain.request)
  }
}
           

主要是調用了Realcall#initExchange方法,來看看這個方法做了什麼

internal fun initExchange(chain: RealInterceptorChain): Exchange {
    synchronized(this) {
      check(expectMoreExchanges) { "released" }
      check(!responseBodyOpen)
      check(!requestBodyOpen)
    }

    val exchangeFinder = this.exchangeFinder!!
    val codec = exchangeFinder.find(client, chain)
    val result = Exchange(this, eventListener, exchangeFinder, codec)
    this.interceptorScopedExchange = result
    this.exchange = result
    synchronized(this) {
      this.requestBodyOpen = true
      this.responseBodyOpen = true
    }

    if (canceled) throw IOException("Canceled")
    return result
}
           

主要是做了一些檢查,然後通過exchangeFinder擷取到一個ExchangeCodec對象codec,然後new了一個Exchange對象傳回。

先看.ExchangeFinder#find

fun find(
    client: OkHttpClient,
    chain: RealInterceptorChain
  ): ExchangeCodec {
    try {
      val resultConnection = findHealthyConnection(
          connectTimeout = chain.connectTimeoutMillis,
          readTimeout = chain.readTimeoutMillis,
          writeTimeout = chain.writeTimeoutMillis,
          pingIntervalMillis = client.pingIntervalMillis,
          connectionRetryEnabled = client.retryOnConnectionFailure,
          doExtensiveHealthChecks = chain.request.method != "GET"
      )
      return resultConnection.newCodec(client, chain)
    } catch ...
}


@Throws(IOException::class)
private fun findHealthyConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    doExtensiveHealthChecks: Boolean
  ): RealConnection {
    while (true) {
      val candidate = findConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled
      )

      // Confirm that the connection is good.
      if (candidate.isHealthy(doExtensiveHealthChecks)) {
        return candidate
      }

      // If it isn't, take it out of the pool.
      candidate.noNewExchanges()

      // Make sure we have some routes left to try. One example where we may exhaust all the routes
      // would happen if we made a new connection and it immediately is detected as unhealthy.
      if (nextRouteToTry != null) continue

      val routesLeft = routeSelection?.hasNext() ?: true
      if (routesLeft) continue

      val routesSelectionLeft = routeSelector?.hasNext() ?: true
      if (routesSelectionLeft) continue

      throw IOException("exhausted all routes")
    }
}

  /**
   * Returns a connection to host a new stream. This prefers the existing connection if it exists,
   * then the pool, finally building a new connection.
   *
   * This checks for cancellation before each blocking operation.
   */
@Throws(IOException::class)
private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    // 請求已經取消了,直接抛出異常
    if (call.isCanceled()) throw IOException("Canceled")

    // 先檢查call本身是否已經被配置設定了連接配接
    val callConnection = call.connection 
    if (callConnection != null) {
      var toClose: Socket? = null
      synchronized(callConnection) {
        if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
          toClose = call.releaseConnectionNoEvents()
        }
      }

      // 如果已經配置設定連接配接,重用這個連接配接
      if (call.connection != null) {
        check(toClose == null)
        return callConnection
      }

      // 連接配接已經釋放了
      toClose?.closeQuietly()
      eventListener.connectionReleased(call, callConnection)
    }

    // We need a new connection. Give it fresh stats.
    refusedStreamCount = 0
    connectionShutdownCount = 0
    otherFailureCount = 0

    // 從連接配接池擷取一個連接配接
    if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
      val result = call.connection!!
      eventListener.connectionAcquired(call, result)
      return result
    }

    // 連接配接池沒有符合條件的連接配接,那麼需要建立新的連接配接

    // 先查找對應的路由
    val routes: List<Route>?
    val route: Route
    if (nextRouteToTry != null) {
      // Use a route from a preceding coalesced connection.
      routes = null
      route = nextRouteToTry!!
      nextRouteToTry = null
    } else if (routeSelection != null && routeSelection!!.hasNext()) {
      // Use a route from an existing route selection.
      routes = null
      route = routeSelection!!.next()
    } else {
      // Compute a new route selection. This is a blocking operation!
      var localRouteSelector = routeSelector
      if (localRouteSelector == null) {
        localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
        this.routeSelector = localRouteSelector
      }
      val localRouteSelection = localRouteSelector.next()
      routeSelection = localRouteSelection
      routes = localRouteSelection.routes

      if (call.isCanceled()) throw IOException("Canceled")

      // Now that we have a set of IP addresses, make another attempt at getting a connection from
      // the pool. We have a better chance of matching thanks to connection coalescing.
      if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
        val result = call.connection!!
        eventListener.connectionAcquired(call, result)
        return result
      }

      route = localRouteSelection.next()
    }

    // 建立連接配接
    val newConnection = RealConnection(connectionPool, route)
    call.connectionToCancel = newConnection
    try {
      newConnection.connect(
          connectTimeout,
          readTimeout,
          writeTimeout,
          pingIntervalMillis,
          connectionRetryEnabled,
          call,
          eventListener
      )
    } finally {
      call.connectionToCancel = null
    }
    call.client.routeDatabase.connected(newConnection.route())

    // If we raced another call connecting to this host, coalesce the connections. This makes for 3
    // different lookups in the connection pool!
    if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
      val result = call.connection!!
      nextRouteToTry = route
      newConnection.socket().closeQuietly()
      eventListener.connectionAcquired(call, result)
      return result
    }

    synchronized(newConnection) {
      connectionPool.put(newConnection)
      call.acquireConnectionNoEvents(newConnection)
    }

    eventListener.connectionAcquired(call, newConnection)
    return newConnection
}
           

4.5 CallServerInterceptor

CallServerInterceptor才是真正向伺服器請求資料的地方,事實上HTTP協定通信的本質也不過是通過Socket進行資料的收發,而HTTP本身有它規定的格式,雙方遵循協定進行資料解析和資料發送就完成了HTTP請求。這裡很多細節我們在App開發中不需要全部了解,是以這裡也不深入去論述了。

class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.exchange!!
    val request = realChain.request
    val requestBody = request.body
    val sentRequestMillis = System.currentTimeMillis()

    exchange.writeRequestHeaders(request)

    var invokeStartEvent = true
    var responseBuilder: Response.Builder? = null
    if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
        exchange.flushRequest()
        responseBuilder = exchange.readResponseHeaders(expectContinue = true)
        exchange.responseHeadersStart()
        invokeStartEvent = false
      }
      if (responseBuilder == null) {
        if (requestBody.isDuplex()) {
          // Prepare a duplex body so that the application can send a request body later.
          exchange.flushRequest()
          val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
          requestBody.writeTo(bufferedRequestBody)
        } else {
          // Write the request body if the "Expect: 100-continue" expectation was met.
          val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
          requestBody.writeTo(bufferedRequestBody)
          bufferedRequestBody.close()
        }
      } else {
        exchange.noRequestBody()
        if (!exchange.connection.isMultiplexed) {
          // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
          // from being reused. Otherwise we're still obligated to transmit the request body to
          // leave the connection in a consistent state.
          exchange.noNewExchangesOnConnection()
        }
      }
    } else {
      exchange.noRequestBody()
    }

    if (requestBody == null || !requestBody.isDuplex()) {
      exchange.finishRequest()
    }
    if (responseBuilder == null) {
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
      if (invokeStartEvent) {
        exchange.responseHeadersStart()
        invokeStartEvent = false
      }
    }
    var response = responseBuilder
        .request(request)
        .handshake(exchange.connection.handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build()
    var code = response.code
    if (code == 100) {
      // Server sent a 100-continue even though we did not request one. Try again to read the actual
      // response status.
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
      if (invokeStartEvent) {
        exchange.responseHeadersStart()
      }
      response = responseBuilder
          .request(request)
          .handshake(exchange.connection.handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
      code = response.code
    }

    exchange.responseHeadersEnd(response)

    response = if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response.newBuilder()
          .body(EMPTY_RESPONSE)
          .build()
    } else {
      response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build()
    }
    if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
        "close".equals(response.header("Connection"), ignoreCase = true)) {
      exchange.noNewExchangesOnConnection()
    }
    if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
      throw ProtocolException(
          "HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
    }
    return response
  }
}
           

5、總結

OkHttp使用了責任鍊設計模式來處理網絡請求,實際上真正會處理請求的其實隻有CacheInterceptor和CallServerInterceptor。OkHttp本身預設不開啟緩存,但是提供了預設的檔案緩存實作DiskLruCache。另外我們可以實作自定義的攔截器去實作一些定制化的功能,通過OkHttpClient.Builder的addInterceptor和addNetworkInterceptor添加到責任鍊的不同位置。

繼續閱讀