Rx處理伺服器請求、緩存的完美封裝
總得來說使用
RxJava
也有一段時間了,參考了一些文章以及分享之後得出了個方案。個人覺得還是挺優雅的。
聲明:本文有些内容來自:使用RxJava優雅的處理伺服器傳回異常
本文内容是基于
Retrofit + RxJava
做的一些巧妙的封裝。
封裝伺服器傳回資料
我們在做向伺服器請求資料的時候,通常傳回的是同一格式,比如下面這樣:
public class BaseModel<T> implements Serializable {
public String code;
public String msg;
public T data;
public boolean success() {
return code.equals("1");
}
@Override
public String toString() {
return "BaseModel{" +
"code='" + code + '\'' +
", msg='" + msg + '\'' +
", data=" + data +
'}';
}
}
Retrofit
請求接口的
ApiService
如下:
public interface ApiService{
@FormUrlEncoded
@POST("/v1/login?c=login&i=memlogin")
Observable<BaseModel<LoginData>> login(
@Field("phone") String phone,
@Field("password") String password);
}
最基本的使用流程就是,首先判斷
code
的值是不是正确(我這裡是’1’),如果正确就做對應更新UI或者其他後續操作,如果失敗,要根據
msg
字段進行提示使用者。基本使用代碼如下:
Api.getDefault().login(phone,password)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<BaseModel<LoginData>>(){
@Override
public void call(BaseModel<LoginData> data){
if(data.code.equal("1")){
//TODO 成功
}else if(data.code.equal("")){
//TODO 失敗 ,提示 msg
}
}
});
這裡寫的訂閱的時候使用一個
Action1
,這時候如果網絡異常導緻通路失敗,我們還不能用這個簡化的
Action1
來做處理,我們需要使用
Subscriber
來重寫
onError
方法來進行捕捉,不然會抛出異常。然後基本上每個接口都是這樣的處理邏輯。
有沒有感覺這樣的代碼有點髒。作為一個懶惰的程式員,是不會寫這麼多次重複的代碼的。那麼我們下面來開始封裝幹。
我們發現
RxJava
有個操作符叫做
compose
,代碼如下:
/**
* 對結果進行預處理
*
* @param <T>
* @return
*/
public static <T> Observable.Transformer<BaseModel<T>, T> handleResult() {
return new Observable.Transformer<BaseModel<T>, T>() {
@Override
public Observable<T> call(Observable<BaseModel<T>> tObservable) {
return tObservable.flatMap(new Func1<BaseModel<T>, Observable<T>>() {
@Override
public Observable<T> call(BaseModel<T> result) {
Track.i("result from network : " + result);
if (result.success()) {
return createData(result.data);
} else {
return Observable.error(new ServerException(result.msg));
}
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
};
}
/**
* 建立成功的資料
*
* @param data
* @param <T>
* @return
*/
private static <T> Observable<T> createData(T data) {
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
try {
subscriber.onNext(data);
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
});
}
上面的處理,我們使用了一個
Transformer
對象對
Observable
進行轉換,可以看到,我這裡首先使用
flatMap
操作符把
Obserable<BaseModel<T>>
,轉換成為
Observable<T>
,裡面具體的處理是首先判斷
code
是否為
“1”
。如果成功了,就傳回一個
Observable<T>
,把具體的資料發射給訂閱者。如果失敗了我們就使用
Observable.error()
發送一個異常的可觀察者。最後還做了線程的切換
subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
,這樣我們外部的調用就簡單多了,變成如下代碼:
Api.getDefault().login(phone,password)
.compost(RxHelper.handleResult())
.subscribe(new Action1<LoginData>>(){
@Override
public void call(LoginData data){
}
});
這樣就簡單多了,首先我們把線程處理的操作封裝了,其次我們這個訂閱的時候隻需要關心成功了的資料,并且大家注意到了沒有,我們在調用的時候,處理資料的時候就和
BaseModel
無關了,隻和它的泛型類型有關,我們關注正确的結果。
那麼如果
code
不為
“1”
的話,傳回的是一個
Observable.error()
,那麼我們同樣是需要對它做處理的。那麼我們也同樣封裝一個
Subscriber
,如下:
public abstract class RxSubscribe<T> extends Subscriber<T> {
@Override
public void onNext(T t) {
_onNext(t);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
if (TDevice.getNetworkType() == ) {
_onError("網絡不可用");
} else if (e instanceof ServerException) {
_onError(e.getMessage());
} else {
_onError("請求失敗,請稍後再試...");
}
}
protected abstract void _onNext(T t);
protected abstract void _onError(String message);
}
好了,簡潔而強大,可以看到了,在
onNext
的時候調用了
_onNext()
,在
onError
的時候,會首先判斷是不是網絡狀态不行,然後在判斷是不是伺服器請求的時候出錯了,或者其他
cookie
過期等等異常,這樣就把錯誤資訊進行了過濾一遍,最後回調了
_onError()
,外部調用隻需要關心打
log
或者
toast
即可。最終的代碼是這樣的:
Api.getDefault().login(phone, password).compose(RxHelper.handleResult())
.subscribe(new RxSubscribe<LoginData>() {
@Override
protected void _onNext(LoginData data) {
showToast(R.string.login_success);
//TODO login success
}
@Override
protected void _onError(String message) {
showToast(message);
}
});
好了,關于伺服器傳回資料的基本封裝就到這裡了。
那麼我們來讨論拓展封裝,比如我們在請求的時候需要顯示一個
Dialog
怎麼辦?
我們可以這樣做,首先在自定義的
Subscribe
中的
onStart
方法中建立
dialog
,并且顯示,然後在
onError
和
onComplete()
把
dialog
消失掉。
public abstract class RxSubscribe<T> extends Subscriber<T> {
private Context mContext;
private SweetAlertDialog dialog;
private String msg;
protected boolean showDialog() {
return true;
}
/**
* @param context context
* @param msg dialog message
*/
public RxSubscribe(Context context, String msg) {
this.mContext = context;
this.msg = msg;
}
/**
* @param context context
*/
public RxSubscribe(Context context) {
this(context, "請稍後...");
}
@Override
public void onCompleted() {
if (showDialog())
dialog.dismiss();
}
@Override
public void onStart() {
super.onStart();
if (showDialog()) {
dialog = new SweetAlertDialog(mContext, SweetAlertDialog.PROGRESS_TYPE)
.setTitleText(msg);
dialog.setCancelable(true);
dialog.setCanceledOnTouchOutside(true);
dialog.show();
}
}
@Override
public void onNext(T t) {
_onNext(t);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
if (TDevice.getNetworkType() == ) {
_onError("網絡不可用");
} else if (e instanceof ServerException) {
_onError(e.getMessage());
} else {
_onError("請求失敗,請稍後再試...");
}
if (showDialog())
dialog.dismiss();
}
protected abstract void _onNext(T t);
protected abstract void _onError(String message);
上面是我完整的代碼,定義多了2個構造函數,用來傳入
Dialog
需要的
Context
和
Message
。然後為了拓展需要,定義多了一個
showDialog()
方法,假如你需要偷偷摸摸做些事情的話,重寫這個方法傳回
false
就行了(嘎嘎),這樣
dialog
就不會顯示了。
處理伺服器資料的緩存
通常我們有些資料從伺服器取回來之後需要緩存起來,在合适的時候使用。我是想下面這樣封裝的:
public static <T> Observable<T> load(Context context,
final String cacheKey,
final long expireTime,
Observable<T> fromNetwork,
boolean forceRefresh) {
Observable<T> fromCache = Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
T cache = (T) CacheManager.readObject(context, cacheKey,expireTime);
if (cache != null) {
subscriber.onNext(cache);
} else {
subscriber.onCompleted();
}
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
/**
* 這裡的fromNetwork 不需要指定Schedule,在handleRequest中已經變換了
*/
fromNetwork = fromNetwork.map(new Func1<T, T>() {
@Override
public T call(T result) {
CacheManager.saveObject(context, (Serializable) result, cacheKey);
return result;
}
});
if (forceRefresh) {
return fromNetwork;
} else {
return Observable.concat(fromCache, fromNetwork).first();
}
}
這個方法有幾個參數,
context、cacheKey
和
expireTime
都是用了處理緩存的,
fromNetwork
是一個從網絡擷取資料的
Observable
,
forceRefresh
是指是否強制重新整理。
首先我裡面做了處理是,從緩存中擷取一個
Observable<T>
對象
fromCache
,如果擷取不到的話就不發射資料。然後,在
fromNetwork
那裡用了一個
map
函數,裡面隻是做了把網絡取回來的資料儲存到緩存中。
最後判斷是否強制重新整理,如果是就直接傳回
fromNetwork
,否則就合并
fromCache
和
fromNetwork
,然後取出第一個
Observable
,其實這裡的流程是,如果
fromCache
有發射資料就會傳回
fromCache
,否者就傳回
fromNetwork
。
最後我們調用的時候就這樣:
Observable<LoginData> fromNetwork = Api.getDefault()
.login(phone, password)
.compose(RxHelper.handleResult());
RetrofitCache.load(context,cacheKey,L,fromNetwork,false)
.subscribe(new RxSubscribe<LoginData>(context, "登入中") {
@Override
protected void _onNext(LoginData data) {
showToast(R.string.login_success);
//TODO login success
}
@Override
protected void _onError(String message) {
showToast(message);
}
});
這樣對于緩存的處理就完畢了。