OkHttp的同步與異步請求
- 同步請求
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("http://publicobject.com/helloworld.txt")
.build();
Response response = client.newCall(request).execute();
如果在安卓平台編碼的話,記得要運作在子線程裡面哦。
- 異步請求
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("http://publicobject.com/helloworld.txt")
.build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.d("OkHttp3", "Call Failed:" + e.getMessage());
}
@Override
public void onResponse(Call call, Response response) throws IOException {
Log.d("OkHttp3", "Call succeeded:" + response.message());
}
});
這樣的寫法在開發中已經遇不到了,各種架構已經把這些操作封裝進去了。隻不過,我既然是回顧,那麼就從原始一點的來。點進去稍稍看看源碼,同步與異步在實作的方式上有啥不同。
-
Call
首先我們會調用
方法。newCall()
@Override
public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
準備在将來某個時候執行網絡服務。
然後點選
execute()
或
enqueue()
都會調到
Call
這個接口。
public interface Call extends Cloneable {
//傳回發起此調用的原始請求。
Request request();
//立即調用請求,并阻塞直到響應可以處理了或正在處理
//錯誤。直接擷取目前的傳回資料Response。使用完畢之後,記得關閉ResponseBody
Response execute() throws IOException;
//異步執行
void enqueue(Callback responseCallback);
...省略代碼
//取消請求。
//已經完成的請求不能被取消。
void cancel();
//建立一個新的、相同的調用,它可以被排隊或執行,即使這個調用已經被執行。
Call clone();
interface Factory {
Call newCall(Request request);
}
}
這個對象表示單個請求/響應對(流),不能執行兩次。接着我們去Call接口的實作類RealCall
...省略N多代碼
@Override
public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
//将任務加入dispatcher
client.dispatcher().executed(this);
//在目前線程中執行Call的getResponseWithInterceptorChain()方法
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
//取目前的傳回資料Response
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
//最終會結束分發
client.dispatcher().finished(this);
}
@Override
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
//使用dispatcher分發給線程池
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
...省略N多代碼
這裡面new了一個匿名内部類:
final class AsyncCall extends NamedRunnable {
//回調
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
//失敗的回調,不在主線程裡面回調
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
//傳回response的回調,也不在主線程裡面
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//最終結束dispatcher();
client.dispatcher().finished(this);
}
}
}
同步請求與異步請求都用到了
dispatcher()
,異步任務交給了線程池來做,下面我們大概的來看一下:
public final class Dispatcher {
//最大的請求數
private int maxRequests = ;
//host請求數最大5
private int maxRequestsPerHost = ;
private int maxRequestsPerHost = ;
/**準備要運作的異步隊列 */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** 運作中的異步隊列,包括取消的還沒有完成的調用。 */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** 運作同步調用隊列。包括取消的還沒有完成的調用。 */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
...省略代碼
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(, Integer.MAX_VALUE, , TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
//如果整個線程池都空閑下來,通知回調線程
if (runningCallsCount == && idleCallback != null) {
idleCallback.run();
}
}
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
//掃描待執行任務的隊列,将任務放入正在執行任務隊列,并執行該任務。
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
...
}
集合隊列
Deque
繼承至
Queue
關于線程池的概念可以上網自行搜尋,這裡簡單标注一下相關的參數概念:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
參數:
corePoolSize - 池中所儲存的線程數,包括空閑線程。
maximumPoolSize - 池中允許的最大線程數。
keepAliveTime - 當線程數大于核心時,此為終止前多餘的空閑線程等待新任務的最長時間。
unit - keepAliveTime 參數的時間機關。
workQueue - 執行前用于保持任務的隊列。此隊列僅保持由 execute 方法送出的 Runnable 任務。
threadFactory - 執行程式建立新線程時使用的工廠。
handler - 由于超出線程範圍和隊列容量而使執行被阻塞時所使用的處理程式。
抛出:
IllegalArgumentException - 如果 corePoolSize 或 keepAliveTime 小于 0,或者 maximumPoolSize 小于等于 0,或者 corePoolSize 大于 maximumPoolSize。
NullPointerException - 如果 workQueue、threadFactory 或 handler 為 null。
關于
Queue
:
隊列通常(但并非一定)以 FIFO(先進先出)的方式排序各個元素。在處理元素前用于儲存元素的 collection。除了基本Collection操作之外,隊列還提供額外的 insertion, extraction, and inspection 操作。每一種方法以兩種形式存在:如果操作失敗有的會抛出異常,有的會傳回一個特殊值(null或者false,取決于操作)。插入操作的另外一種形式專門用于有容量限制的Queue。
一點點的Interceptor源碼解釋
Interceptor:觀察,修改以及可能有問題的請求輸出和響應請求的回來。通常情況下攔截器用來添加,移除或者轉換請求或者回應的頭部資訊。攔截器接口中有
intercept(Chain chain)
方法,同時傳回Response。
上面我們在分析同步異步的時候都會看到一個方法
getResponseWithInterceptorChain()
:
Response getResponseWithInterceptorChain() throws IOException {
// 建構一個完整的攔截器堆棧。
List<Interceptor> interceptors = new ArrayList<>();
//使用者自己設定的攔截器
interceptors.addAll(client.interceptors());
//當伺服器傳回目前請求需要進行重定向時直接發起新的請求,并在條件允許的情況下複用目前連接配接
interceptors.add(retryAndFollowUpInterceptor);
//處理了cookie,一些報頭字段,壓縮請求
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//處理Cache,
interceptors.add(new CacheInterceptor(client.internalCache()));
//連接配接請求處理
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
//這是鍊中的最後一個攔截器。它對伺服器進行網絡調用。
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, ,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
//将request傳入攔截鍊中傳回
return chain.proceed(originalRequest);
}
接下來看下
RealInterceptorChain
的實作邏輯:
public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
HttpCodec httpCodec, RealConnection connection, int index, Request request, Call call,
EventListener eventListener, int connectTimeout, int readTimeout, int writeTimeout) {
...省略無數代碼
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
...
// 調用下一個攔截鍊
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + , request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
//每個interceptors,都實作了Interceptor接口。是以都要重寫Response intercept(Chain chain)方法。
//index + 1,讓每個攔截鍊有自己的攔截器。
Interceptor interceptor = interceptors.get(index);
//把攔截鍊傳入攔截器中
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + < interceptors.size() && next.calls != ) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
...
return response;
}
}
一個攔截器的intercept方法所執行的邏輯大緻分為三部分:
- 在發起請求前對request進行處理
- 調用下一個攔截器,擷取response
- 對response進行處理,傳回給上一個攔截器
這就是OkHttp攔截器機制的核心邏輯。是以一個網絡請求實際上就是一個個攔截器執行其intercept方法的過程。而這其中除了使用者自定義的攔截器外還有幾個核心攔截器完成了網絡通路的核心邏輯,按照先後順序依次是:
- RetryAndFollowUpInterceptor
- BridgeInterceptor
- CacheInterceptor
- ConnectIntercetot
- CallServerInterceptor
附上一張圖:
上圖與Interceptor的部分摘抄至這個部落格。
下面我貼一下,在實際中添加緩存的例子:
private static OkHttpClient mOkHttpClient;
static {
initOkHttpClient();
}
/**
* 為okhttp添加緩存,如果伺服器不支援緩存時,進而讓okhttp支援緩存。
*/
private static class CacheInterceptor implements Interceptor {
@Override
public Response intercept(Chain chain) throws IOException {
// 有網絡時 設定緩存逾時時間1個小時
int maxAge = * ;
// 無網絡時,設定逾時為1天
int maxStale = * * ;
Request request = chain.request();
if (CommonUtil.isNetworkAvailable(Context)) {
//有網絡時隻從網絡擷取
request = request.newBuilder().cacheControl(CacheControl.FORCE_NETWORK).build();
} else {
//無網絡時隻從緩存中讀取
request = request.newBuilder().cacheControl(CacheControl.FORCE_CACHE).build();
}
Response response = chain.proceed(request);
if (CommonUtil.isNetworkAvailable(Context)) {
response = response.newBuilder()
.removeHeader("Pragma")
.header("Cache-Control", "public, max-age=" + maxAge)
.build();
} else {
response = response.newBuilder()
.removeHeader("Pragma")
.header("Cache-Control", "public, only-if-cached, max-stale=" + maxStale)
.build();
}
return response;
}
}
private static void initOkHttpClient() {
HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
if (mOkHttpClient == null) {
synchronized (RetrofitHelper.class) {
if (mOkHttpClient == null) {
//設定Http緩存
Cache cache = new Cache(new File(Context.getCacheDir(), "HttpCache"), * * );
mOkHttpClient = new OkHttpClient.Builder()
.cache(cache)
.addInterceptor(interceptor)
.addNetworkInterceptor(new CacheInterceptor())
.addNetworkInterceptor(new StethoInterceptor())
.retryOnConnectionFailure(true)
.connectTimeout(, TimeUnit.SECONDS)
.writeTimeout(, TimeUnit.SECONDS)
.readTimeout(, TimeUnit.SECONDS)
.build();
}
}
}
}
基本的用法都會,但是想深究一下實作的過程,就發現自己水準不夠,看不下去了。