Rx处理服务器请求、缓存的完美封装
总得来说使用
RxJava
也有一段时间了,参考了一些文章以及分享之后得出了个方案。个人觉得还是挺优雅的。
声明:本文有些内容来自:使用RxJava优雅的处理服务器返回异常
本文内容是基于
Retrofit + RxJava
做的一些巧妙的封装。
封装服务器返回数据
我们在做向服务器请求数据的时候,通常返回的是同一格式,比如下面这样:
public class BaseModel<T> implements Serializable {
public String code;
public String msg;
public T data;
public boolean success() {
return code.equals("1");
}
@Override
public String toString() {
return "BaseModel{" +
"code='" + code + '\'' +
", msg='" + msg + '\'' +
", data=" + data +
'}';
}
}
Retrofit
请求接口的
ApiService
如下:
public interface ApiService{
@FormUrlEncoded
@POST("/v1/login?c=login&i=memlogin")
Observable<BaseModel<LoginData>> login(
@Field("phone") String phone,
@Field("password") String password);
}
最基本的使用流程就是,首先判断
code
的值是不是正确(我这里是’1’),如果正确就做对应更新UI或者其他后续操作,如果失败,要根据
msg
字段进行提示用户。基本使用代码如下:
Api.getDefault().login(phone,password)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<BaseModel<LoginData>>(){
@Override
public void call(BaseModel<LoginData> data){
if(data.code.equal("1")){
//TODO 成功
}else if(data.code.equal("")){
//TODO 失败 ,提示 msg
}
}
});
这里写的订阅的时候使用一个
Action1
,这时候如果网络异常导致访问失败,我们还不能用这个简化的
Action1
来做处理,我们需要使用
Subscriber
来重写
onError
方法来进行捕捉,不然会抛出异常。然后基本上每个接口都是这样的处理逻辑。
有没有感觉这样的代码有点脏。作为一个懒惰的程序员,是不会写这么多次重复的代码的。那么我们下面来开始封装干。
我们发现
RxJava
有个操作符叫做
compose
,代码如下:
/**
* 对结果进行预处理
*
* @param <T>
* @return
*/
public static <T> Observable.Transformer<BaseModel<T>, T> handleResult() {
return new Observable.Transformer<BaseModel<T>, T>() {
@Override
public Observable<T> call(Observable<BaseModel<T>> tObservable) {
return tObservable.flatMap(new Func1<BaseModel<T>, Observable<T>>() {
@Override
public Observable<T> call(BaseModel<T> result) {
Track.i("result from network : " + result);
if (result.success()) {
return createData(result.data);
} else {
return Observable.error(new ServerException(result.msg));
}
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
};
}
/**
* 创建成功的数据
*
* @param data
* @param <T>
* @return
*/
private static <T> Observable<T> createData(T data) {
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
try {
subscriber.onNext(data);
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
});
}
上面的处理,我们使用了一个
Transformer
对象对
Observable
进行转换,可以看到,我这里首先使用
flatMap
操作符把
Obserable<BaseModel<T>>
,转换成为
Observable<T>
,里面具体的处理是首先判断
code
是否为
“1”
。如果成功了,就返回一个
Observable<T>
,把具体的数据发射给订阅者。如果失败了我们就使用
Observable.error()
发送一个异常的可观察者。最后还做了线程的切换
subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
,这样我们外部的调用就简单多了,变成如下代码:
Api.getDefault().login(phone,password)
.compost(RxHelper.handleResult())
.subscribe(new Action1<LoginData>>(){
@Override
public void call(LoginData data){
}
});
这样就简单多了,首先我们把线程处理的操作封装了,其次我们这个订阅的时候只需要关心成功了的数据,并且大家注意到了没有,我们在调用的时候,处理数据的时候就和
BaseModel
无关了,只和它的泛型类型有关,我们关注正确的结果。
那么如果
code
不为
“1”
的话,返回的是一个
Observable.error()
,那么我们同样是需要对它做处理的。那么我们也同样封装一个
Subscriber
,如下:
public abstract class RxSubscribe<T> extends Subscriber<T> {
@Override
public void onNext(T t) {
_onNext(t);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
if (TDevice.getNetworkType() == ) {
_onError("网络不可用");
} else if (e instanceof ServerException) {
_onError(e.getMessage());
} else {
_onError("请求失败,请稍后再试...");
}
}
protected abstract void _onNext(T t);
protected abstract void _onError(String message);
}
好了,简洁而强大,可以看到了,在
onNext
的时候调用了
_onNext()
,在
onError
的时候,会首先判断是不是网络状态不行,然后在判断是不是服务器请求的时候出错了,或者其他
cookie
过期等等异常,这样就把错误信息进行了过滤一遍,最后回调了
_onError()
,外部调用只需要关心打
log
或者
toast
即可。最终的代码是这样的:
Api.getDefault().login(phone, password).compose(RxHelper.handleResult())
.subscribe(new RxSubscribe<LoginData>() {
@Override
protected void _onNext(LoginData data) {
showToast(R.string.login_success);
//TODO login success
}
@Override
protected void _onError(String message) {
showToast(message);
}
});
好了,关于服务器返回数据的基本封装就到这里了。
那么我们来讨论拓展封装,比如我们在请求的时候需要显示一个
Dialog
怎么办?
我们可以这样做,首先在自定义的
Subscribe
中的
onStart
方法中创建
dialog
,并且显示,然后在
onError
和
onComplete()
把
dialog
消失掉。
public abstract class RxSubscribe<T> extends Subscriber<T> {
private Context mContext;
private SweetAlertDialog dialog;
private String msg;
protected boolean showDialog() {
return true;
}
/**
* @param context context
* @param msg dialog message
*/
public RxSubscribe(Context context, String msg) {
this.mContext = context;
this.msg = msg;
}
/**
* @param context context
*/
public RxSubscribe(Context context) {
this(context, "请稍后...");
}
@Override
public void onCompleted() {
if (showDialog())
dialog.dismiss();
}
@Override
public void onStart() {
super.onStart();
if (showDialog()) {
dialog = new SweetAlertDialog(mContext, SweetAlertDialog.PROGRESS_TYPE)
.setTitleText(msg);
dialog.setCancelable(true);
dialog.setCanceledOnTouchOutside(true);
dialog.show();
}
}
@Override
public void onNext(T t) {
_onNext(t);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
if (TDevice.getNetworkType() == ) {
_onError("网络不可用");
} else if (e instanceof ServerException) {
_onError(e.getMessage());
} else {
_onError("请求失败,请稍后再试...");
}
if (showDialog())
dialog.dismiss();
}
protected abstract void _onNext(T t);
protected abstract void _onError(String message);
上面是我完整的代码,定义多了2个构造函数,用来传入
Dialog
需要的
Context
和
Message
。然后为了拓展需要,定义多了一个
showDialog()
方法,假如你需要偷偷摸摸做些事情的话,重写这个方法返回
false
就行了(嘎嘎),这样
dialog
就不会显示了。
处理服务器数据的缓存
通常我们有些数据从服务器取回来之后需要缓存起来,在合适的时候使用。我是想下面这样封装的:
public static <T> Observable<T> load(Context context,
final String cacheKey,
final long expireTime,
Observable<T> fromNetwork,
boolean forceRefresh) {
Observable<T> fromCache = Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
T cache = (T) CacheManager.readObject(context, cacheKey,expireTime);
if (cache != null) {
subscriber.onNext(cache);
} else {
subscriber.onCompleted();
}
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
/**
* 这里的fromNetwork 不需要指定Schedule,在handleRequest中已经变换了
*/
fromNetwork = fromNetwork.map(new Func1<T, T>() {
@Override
public T call(T result) {
CacheManager.saveObject(context, (Serializable) result, cacheKey);
return result;
}
});
if (forceRefresh) {
return fromNetwork;
} else {
return Observable.concat(fromCache, fromNetwork).first();
}
}
这个方法有几个参数,
context、cacheKey
和
expireTime
都是用了处理缓存的,
fromNetwork
是一个从网络获取数据的
Observable
,
forceRefresh
是指是否强制刷新。
首先我里面做了处理是,从缓存中获取一个
Observable<T>
对象
fromCache
,如果获取不到的话就不发射数据。然后,在
fromNetwork
那里用了一个
map
函数,里面只是做了把网络取回来的数据保存到缓存中。
最后判断是否强制刷新,如果是就直接返回
fromNetwork
,否则就合并
fromCache
和
fromNetwork
,然后取出第一个
Observable
,其实这里的流程是,如果
fromCache
有发射数据就会返回
fromCache
,否者就返回
fromNetwork
。
最后我们调用的时候就这样:
Observable<LoginData> fromNetwork = Api.getDefault()
.login(phone, password)
.compose(RxHelper.handleResult());
RetrofitCache.load(context,cacheKey,L,fromNetwork,false)
.subscribe(new RxSubscribe<LoginData>(context, "登录中") {
@Override
protected void _onNext(LoginData data) {
showToast(R.string.login_success);
//TODO login success
}
@Override
protected void _onError(String message) {
showToast(message);
}
});
这样对于缓存的处理就完毕了。