-
-
-
- RxJava是什么
- 好在哪?
- 准备知识
- 响应式编程
- 数据流
- 异步
- 小结
- 观察者模式
- 响应式编程
- 解决问题的模型
- map解决的模型
- 创建操作
- 变换操作 f
- 通知操作
- flatMap解决模型
- map解决的模型
- RxJava中主要类介绍
- Observable
- Observer
- 主要类图
- RxJava中的是如何实现两个模型的?
- 创建可观测序列Observable
- 创建变换后的可观测序列
- 创建观察者Observer
- 如何关联被观察者与观察者,形成数据流。
- 模型一需求解决方案
- 需求
- 重贴一下上面的代码
- 源码分析
- 模型二解决方案
- 需求
- 代码
- 源码分析
- 线程控制
- 其他操作符
-
-
在看此文之前建议看下,https://www.jianshu.com/p/9ee9fa13eeef这篇文章,只有图,没有字
RxJava是什么
一个基于观察者模式的异步任务框架
好在哪?
好在用
RxJava
做的异步请求更简明更清晰
举例
需求:在IO线程上执行三个网络请求操作分别为
,
query(A)
,
query(B)
,且
query(C)
依赖于
query(B)
返回的结果,同样
query(A)
依赖于
query(C)
返回的结果
query(B)
用android的异步框架得这么写(伪代码):
Server server = ...;
server.makeRequest(new Query('A'), new Listener(){
onSuccess(boolean b){
if(b){
server.makeRequest(new new Query('B'), new Listener(){
onSuccess(boolean b){
if(b){
server.makeRequest(new Query('C'), new Listener(){
onSuccess(boolean b){
}
})
}
}
})
}
}
})
用Rxjava只需要这么写(伪代码)
Observable.just("A").flatMap((s) -> {
return makeRequest(new Query(s));
}).flatMap((aBoolean) -> {
if(aBoolean) return makeRequest(new Query("B"));
return null;
}).flatMap((aBoolean) -> {
if(aBoolean) return makeRequest(new Query("C"));
return null;
}).subscribeOn(Scheduals.io);
public static Observable<Boolean> makeRequest(Query query){
return Observable.just(query)
.map(new Function<Query, Boolean>() {
@Override
public Boolean apply(Query query) throws Exception {
//TODO
return true;
}
});
}
非常简洁,避免了回调地狱,之后会通过分析源码,去思考能够避免回调地狱的原因
准备知识
响应式编程
响应式编程是一种通过异步和数据流来构建事物关系的编程模型
数据流
是两个事物(在这里我们理解为函数)间关系的桥梁,且只有一个方向,即从上游实体到下游实体。举个例子与
f(x1)
之间如何产生关系?
g(x2)
做为f的输入,当
x1
生成后会通过数据(事件)流通知
f(x1)
执行,这里的
g(x2)
就是上游实体,
f(x1)
就是下游实体。但如果有这样的需求,三个独立的函数
g(x2)
都完成后再通知
f(x1),f(x2),f(x3)
?应该怎样去构建他们的关系?就是我们接下来要讲用异步的方式去构建
g(x2)
异步
数据流不能完全构建出函数之间的关系。如数据流一节所说是相互独立的,他们之间的关系是独立的。这种独立的关系就可以用异步来表示。所以解决上一节的问题便是让
f(x1),f(x2),f(x3)
在各自的线程中执行,完成后再用数据流通知给
f(x1),f(x2),f(x3)
g(x)
小结
异步是为区分无关的事物,数据流是为了联系起有关的事物。那么如何实现数据流传递呢?就用到下面的观察者模式
观察者模式
观察者模式面向的需求是:A对象对B对象的某种变化高度敏感,当B对象发生变化时,A对象需要瞬间做出反应,一般实现观察者模式需要有观察者即A对象,有被观察者
Observer
即B对象,在实现的时候B对象需要持有A对象的引用,这样当B对象发生变化时,B对象才能通过A对象的引用让A对象做出反应,android中的典型实现便是监听器事件,
Observable
是被观察者,
View
是观察者用
OnClickListener
,让
setOnClickListener()
持有
View
的引用,当
OnClickListener
监听到点击事件时便通知
View
进行处理。这样子就简单的实现了数据流从B->A的传递。
OnClickListener
解决问题的模型
RxJava
可以通过很多操作符(就是RxJava中的一些方法)解决许多问题模型, 尽然它是异步任务框架,我们就来看看它是怎么处理异步任务问题模型的,只解释其中两种比较典型的问题模型。
map解决的模型
由模型图可知,首先我们需要创建可观测序列,然后再用观察者模式去通知它的下游实体
map
操作(其实模型中的虚线基本上是由观察者模式和异步实现的),在Map操作完成后形成了另一个可观测序列,在用观察者模式去通知这个序列依次输出。这样的模型可用来解决如下需求:子线程执行一个耗时任务,执行完成后返回给主线程
通过模型图可知,创建操作后需要通知变换操作,这个通知就用观察者模式实现。而变换操作是独立的而且在子线程,所以需要通过异步来实现,且变换操作执行完成后要通知给主线程的。所以也要使用观察者模式
创建操作
如前文所述,创建操作可以看做是一个函数,由于
f(x)
要通知下游的,所以这里的
f(x)
是被观察者,在RxJava里用
f(x)
表示被观察者去发起通知。在RxJava中
Observable
为just,假设这里的输入x为”A”,所以其创建操作为
f
.
Observable.just('A')
变换操作 f
同理这里的变换操作为,需要运行在子线程,要用
map
实现。
Handler
通知操作
而子线程的通知操作也要用观察者模式实现,其需要引用一个观察者,这个观察者需要自己定义,也就是说某个耗时的转换操作在子线程运行完成后,要发送到你自己定义的主线程的观察者中
flatMap解决模型
从模型图我们可以看到,FlatMap里面的数据有两个特点:
- 数据被分成了n个
- 这n个数据也在可观测的序列上
对应的 ,它能解决两个基本需求:
- 原始的单个数据是集合类,比如
List<String>
,
FlatMap
可以把它们变成一个个
String
.
- 这每个String都在可观测序列上,所以也能有通知操作的能力
所以对于第一点他能够简单遍历二维数据,举个例子:需求是遍历所有学生选的课程
final List<Student> list = ...
Disposable disposable = Observable.fromIterable(list)
/*将学生的课程发送出去,从学生实例得到课程实例,再发射出去*/
.flatMap(new Function<Student, ObservableSource<Student.Course>>() {
@Override
public ObservableSource<Student.Course> apply(Student student) throws Exception {
//Log.d(TAG,"flatmap student name = "+student.name);
return Observable.fromIterable(student.getCourseList());
}
})
/*接受到学生的课程*/
.subscribe(new Consumer<Student.Course>() {
@Override
public void accept(Student.Course course) throws Exception {
System.out.printf("Consumer accept course = " + course.getName());
}
});
}
对于第二点,它能够解决的是,网络请求嵌套的问题。举个例子(该例子引用自https://blog.csdn.net/jdsjlzx/article/details/51493552):需求是
queryA
和
queryB
. 并且queryB的运行依赖于queryA的结果
RxJava中主要类介绍
Observable
相当于观察者模式中的被观察者
Observer
相当于观察者模式中的观察者
主要类图
RxJava中的是如何实现两个模型的?
先以
map
为例
创建可观测序列Observable
-
:just
Observable observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9);
创建变换后的可观测序列
-
:map
Observable observable2 = observable.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
//模拟网络请求
Thread.sleep();
return ;
}
})
创建观察者Observer
Consumer consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
//TODO
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
};
如何关联被观察者与观察者,形成数据流。
在RxJava中,subscribe()这里既是订阅,其默认状态也发生了变化. 我们可以用链式调用把他们串起来
上述代码实现了在主线程传递序列,但实际上可以理解为循环了上述序列,但这只是一个同步的实现。而RxJava是一个异步框架,能够很方便的进行线程切换,只需要在合适的位置加上subscrieOn,observeOn即可,接着上面的例子
observable2
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
subscribe(consumer);
该例子实现了
map
操作在子线程运行,然后切换回主线程通知观察者执行. 下面深入具体的源码去分析一下上面这个例子
模型一需求解决方案
需求
请求网络,有结果返回主线程
重贴一下上面的代码
Observable.just(, , , , , , , , )
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
//模拟网络请求
Thread.sleep();
return ;
}
}) .subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
//TODO
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
});
源码分析
- 可观测序列的创建操作
-
:时序图为just(item)
方法解释如下onAssembly
,就是new ObservableFromArray<T>(items)
这个被观察者中保存ObservableFromArray
这个数组items
-
:存储了map()
(map的上游实体)的引用,我们用ObservableFromArray
表示,与ofa_this
对象的引用我们用Function
表示fun1
-
:存储了其上游被观察者subscribeOn()
的引用ObservableMap
和IO调度器map_this
-
:存储了observeOn()
的上游的引用ObservableSubscribeOn
和UI线程调度器sub_this
- 思考:为什么当前对象要存储之前对象所对应的
引用?Observable
因为后面需要用到这些引用去订阅对应的观察者Observer,如下图
-
- 订阅操作:
- 思考:上游如何通知下游呢?
分析模型一可知,因为中间有线程切换操作加数据转换操作,所以数据流必须流经这两个实体。所以如果想让顶层通知到最后的底层的话,必须要经过中间层,让数据流一层一层传递。又根据观察者模式,需要下游的observer订阅上游的observable,才能让数据从上游流向下游。
- 源码中的解决方案 上图步骤1的源码如下,2,3的原理同1:
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { //subscribe(observer)中的observer即是LambdaObserver LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls; } @SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } //ObservableSubscribeOn.java @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); //ObserveOnObserver存储了LambdaObserver的实例ls,且在订阅前操作中存储的ObserveOnObserver订阅被观察者者source source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
- 思考:上游如何通知下游呢?
- 通知操作:RxJava中订阅完成后,因为之前已经订阅过,所以上游可以调用onNext方法直接通知下游 在mapObserver.onNext()操作中会执行如下代码:
-
最后在执行LambdaObserver的onNext中执行消费者函数:@Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { actual.onNext(null); return; } U v; try { //回调map中定义的函数,mapper即上文存储的函数 v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } //利用之前保存的actual实例调用其下游的onNext actual.onNext(v); }
@Override public void onNext(T t) { if (!isDisposed()) { try { //可以看到回调了观察者内部定义的函数 onNext.accept(t); } catch (Throwable e) { Exceptions.throwIfFatal(e); get().dispose(); onError(e); } } }
- 另一个结合reforit的例子,一图以蔽之,其中
和bodyObservale
都是callExecuteObservale
中的被观察者,省略了网络请求的创建被观察者创操作,代码如下:reforit
对应的解释图如下:disposable = Network.getGankApi() .getBeauties(, page) .map(new Function<GankBeautyResult, List<Item>>() { @Override public List<Item> apply(GankBeautyResult gankBeautyResult) throws Exception { return null; } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<Item>>() { @Override public void accept(@NonNull List<Item> items) throws Exception { swipeRefreshLayout.setRefreshing(false); pageTv.setText(getString(R.string.page_with_number, MapFragment.this.page)); adapter.setItems(items); } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { swipeRefreshLayout.setRefreshing(false); Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show(); } });
模型二解决方案
需求假设有一个数据结构『学生』,要打印出每个学生所需要修的所有课程的名称呢?
代码
源码分析final List<Student> list = ... Disposable disposable = Observable.fromIterable(list) /*将学生的课程发送出去,从学生实例得到课程实例,再发射出去*/ .flatMap(new Function<Student, ObservableSource<Student.Course>>() { @Override public ObservableSource<Student.Course> apply(Student student) throws Exception { //Log.d(TAG,"flatmap student name = "+student.name); return Observable.fromIterable(student.getCourseList()); } }) /*接受到学生的课程*/ .subscribe(new Consumer<Student.Course>() { @Override public void accept(Student.Course course) throws Exception { System.out.printf("Consumer accept course = " + course.getName()); } }); }
- 订阅前的操作
- 订阅操作
- 通知操作
-
通知在订阅操作订阅的观察者fro_this
-MergeObserver
收到通知,执行订阅前操作的里面的MergeObserver
,代码如下,其返回的p是一个fun2
类型,这样就实现了相当于把转换后的数据放入了对应的可观测序列,根据模型二可知,下一步就是要将每个可观测序列中的菱形数据提出来在放入一个Observable
进行输出。这个提出来的过程就是Observable
实现的。可以理解为做了一个subscribeInner
操作。这个过程逻辑有点复杂,因为涉及到并发控制,所以略过。map
@Override public void onNext(T t) { // safeguard against misbehaving sources if (done) { return; } ObservableSource<? extends U> p; try { //执行fun2函数 p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource"); } catch (Throwable e) { Exceptions.throwIfFatal(e); s.dispose(); onError(e); return; } if (maxConcurrency != Integer.MAX_VALUE) { synchronized (this) { if (wip == maxConcurrency) { sources.offer(p); return; } wip++; } } //内部订阅 subscribeInner(p); }
-
线程控制
从模型一中我们可以总计如下:- subscribeOn()切换子线程是在订阅过程中切换的
- observerOn()切换成主线程是在通知的过程中
- 所以上面两个一个操纵订阅过程的线程,一个操纵通知过程的线程,猜测可以产生出任何一种你想要的线程切换功能
- 举个栗子理解一下:
Observable .map // 操作1 .flatMap // 操作2 .subscribeOn(io)//操作3 .map //操作4 .flatMap //操作5 .observeOn(main)//操作6 .map //操作7 .flatMap //操作8 .subscribeOn(io) //操作9 .subscribe(handleData)
我们可以发现,第一个操作9的线程切换没有产生效果,所以总结如下:subscribeOn只能调用一次,因为如果有多次,只会有一次有效果,observeOn()可以多次调用实现了你想要的线程的多次切换。
其他操作符
当然还有其他操作符,留给后面继续讨论,不过掌握了map和flatmap后,后面的有些操作符应该就不会特别难理解,具体的可见官网
- 另一个结合reforit的例子,一图以蔽之,其中