天天看点

可能比扔物线写得还好的对RxJava的理解

        • RxJava是什么
        • 好在哪?
        • 准备知识
          • 响应式编程
            • 数据流
            • 异步
            • 小结
          • 观察者模式
        • 解决问题的模型
          • map解决的模型
            • 创建操作
            • 变换操作 f
            • 通知操作
          • flatMap解决模型
        • RxJava中主要类介绍
          • Observable
          • Observer
          • 主要类图
        • RxJava中的是如何实现两个模型的?
          • 创建可观测序列Observable
          • 创建变换后的可观测序列
          • 创建观察者Observer
          • 如何关联被观察者与观察者,形成数据流。
          • 模型一需求解决方案
            • 需求
            • 重贴一下上面的代码
            • 源码分析
          • 模型二解决方案
            • 需求
            • 代码
            • 源码分析
        • 线程控制
        • 其他操作符

在看此文之前建议看下,https://www.jianshu.com/p/9ee9fa13eeef这篇文章,只有图,没有字

RxJava是什么

一个基于观察者模式的异步任务框架

好在哪?

好在用

RxJava

做的异步请求更简明更清晰

举例

需求:在IO线程上执行三个网络请求操作分别为

query(A)

,

query(B)

,

query(C)

,且

query(B)

依赖于

query(A)

返回的结果,同样

query(C)

依赖于

query(B)

返回的结果

用android的异步框架得这么写(伪代码):

Server server = ...;
server.makeRequest(new Query('A'), new Listener(){
        onSuccess(boolean b){
            if(b){
                server.makeRequest(new new Query('B'), new Listener(){
                    onSuccess(boolean b){
                        if(b){
                            server.makeRequest(new Query('C'), new Listener(){
                                onSuccess(boolean b){
                                }
                            })
                        }
                    }
                })
            }   
       }

})
           

用Rxjava只需要这么写(伪代码)

Observable.just("A").flatMap((s) -> {
                return makeRequest(new Query(s));
        }).flatMap((aBoolean) -> {
            if(aBoolean) return makeRequest(new Query("B"));
            return null;
        }).flatMap((aBoolean) -> {
                if(aBoolean) return makeRequest(new Query("C"));
                return null;
        }).subscribeOn(Scheduals.io);
           
public static Observable<Boolean> makeRequest(Query query){
        return Observable.just(query)
                .map(new Function<Query, Boolean>() {
                    @Override
                    public Boolean apply(Query query) throws Exception {
                        //TODO
                        return true;
                    }
                });
    }
           

非常简洁,避免了回调地狱,之后会通过分析源码,去思考能够避免回调地狱的原因

准备知识

响应式编程
响应式编程是一种通过异步和数据流来构建事物关系的编程模型

数据流

是两个事物(在这里我们理解为函数)间关系的桥梁,且只有一个方向,即从上游实体到下游实体。举个例子

f(x1)

g(x2)

之间如何产生关系?

x1

做为f的输入,当

f(x1)

生成后会通过数据(事件)流通知

g(x2)

执行,这里的

f(x1)

就是上游实体,

g(x2)

就是下游实体。但如果有这样的需求,三个独立的函数

f(x1),f(x2),f(x3)

都完成后再通知

g(x2)

?应该怎样去构建他们的关系?就是我们接下来要讲用异步的方式去构建

异步

数据流不能完全构建出函数之间的关系。如数据流一节所说

f(x1),f(x2),f(x3)

是相互独立的,他们之间的关系是独立的。这种独立的关系就可以用异步来表示。所以解决上一节的问题便是让

f(x1),f(x2),f(x3)

在各自的线程中执行,完成后再用数据流通知给

g(x)

小结

异步是为区分无关的事物,数据流是为了联系起有关的事物。那么如何实现数据流传递呢?就用到下面的观察者模式
观察者模式
观察者模式面向的需求是:A对象对B对象的某种变化高度敏感,当B对象发生变化时,A对象需要瞬间做出反应,一般实现观察者模式需要有观察者

Observer

即A对象,有被观察者

Observable

即B对象,在实现的时候B对象需要持有A对象的引用,这样当B对象发生变化时,B对象才能通过A对象的引用让A对象做出反应,android中的典型实现便是监听器事件,

View

是被观察者,

OnClickListener

是观察者用

setOnClickListener()

,让

View

持有

OnClickListener

的引用,当

View

监听到点击事件时便通知

OnClickListener

进行处理。这样子就简单的实现了数据流从B->A的传递。

解决问题的模型

RxJava

可以通过很多操作符(就是RxJava中的一些方法)解决许多问题模型, 尽然它是异步任务框架,我们就来看看它是怎么处理异步任务问题模型的,只解释其中两种比较典型的问题模型。
map解决的模型
可能比扔物线写得还好的对RxJava的理解
由模型图可知,首先我们需要创建可观测序列,然后再用观察者模式去通知它的下游实体

map

操作(其实模型中的虚线基本上是由观察者模式和异步实现的),在Map操作完成后形成了另一个可观测序列,在用观察者模式去通知这个序列依次输出。这样的模型可用来解决如下需求:子线程执行一个耗时任务,执行完成后返回给主线程

通过模型图可知,创建操作后需要通知变换操作,这个通知就用观察者模式实现。而变换操作是独立的而且在子线程,所以需要通过异步来实现,且变换操作执行完成后要通知给主线程的。所以也要使用观察者模式

创建操作

如前文所述,创建操作可以看做是一个函数

f(x)

,由于

f(x)

要通知下游的,所以这里的

f(x)

是被观察者,在RxJava里用

Observable

表示被观察者去发起通知。在RxJava中

f

为just,假设这里的输入x为”A”,所以其创建操作为

Observable.just('A')

.

变换操作 f

同理这里的变换操作为

map

,需要运行在子线程,要用

Handler

实现。

通知操作

而子线程的通知操作也要用观察者模式实现,其需要引用一个观察者,这个观察者需要自己定义,也就是说某个耗时的转换操作在子线程运行完成后,要发送到你自己定义的主线程的观察者中
flatMap解决模型
可能比扔物线写得还好的对RxJava的理解

从模型图我们可以看到,FlatMap里面的数据有两个特点:

- 数据被分成了n个

- 这n个数据也在可观测的序列上

对应的 ,它能解决两个基本需求:

- 原始的单个数据是集合类,比如

List<String>

FlatMap

可以把它们变成一个个

String

.

- 这每个String都在可观测序列上,所以也能有通知操作的能力

所以对于第一点他能够简单遍历二维数据,举个例子:需求是遍历所有学生选的课程

final List<Student> list = ...
 Disposable disposable = Observable.fromIterable(list)
                /*将学生的课程发送出去,从学生实例得到课程实例,再发射出去*/
                .flatMap(new Function<Student, ObservableSource<Student.Course>>() {
  @Override
public ObservableSource<Student.Course> apply(Student student) throws Exception {
                        //Log.d(TAG,"flatmap student name = "+student.name);
                        return Observable.fromIterable(student.getCourseList());
                    }
                })
                /*接受到学生的课程*/
                .subscribe(new Consumer<Student.Course>() {
                    @Override
                    public void accept(Student.Course course) throws Exception {
                        System.out.printf("Consumer accept course = " + course.getName());
                    }

                });
    }
           

对于第二点,它能够解决的是,网络请求嵌套的问题。举个例子(该例子引用自https://blog.csdn.net/jdsjlzx/article/details/51493552):需求是

queryA

queryB

. 并且queryB的运行依赖于queryA的结果

可能比扔物线写得还好的对RxJava的理解
可能比扔物线写得还好的对RxJava的理解

RxJava中主要类介绍

Observable
相当于观察者模式中的被观察者
Observer
相当于观察者模式中的观察者
主要类图
可能比扔物线写得还好的对RxJava的理解

RxJava中的是如何实现两个模型的?

先以

map

为例

创建可观测序列Observable
  • just

    :

    Observable observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9);

创建变换后的可观测序列
  • map

    :
Observable observable2 = observable.map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer) throws Exception {
                        //模拟网络请求
                        Thread.sleep();
                        return ;
                    }
                })
           
创建观察者Observer
Consumer consumer = new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        //TODO
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {

                    }
                };
           
如何关联被观察者与观察者,形成数据流。
在RxJava中,subscribe()这里既是订阅,其默认状态也发生了变化. 我们可以用链式调用把他们串起来

上述代码实现了在主线程传递序列,但实际上可以理解为循环了上述序列,但这只是一个同步的实现。而RxJava是一个异步框架,能够很方便的进行线程切换,只需要在合适的位置加上subscrieOn,observeOn即可,接着上面的例子

observable2
 .subscribeOn(Schedulers.io())
 .observeOn(AndroidSchedulers.mainThread())
subscribe(consumer);
           

该例子实现了

map

操作在子线程运行,然后切换回主线程通知观察者执行. 下面深入具体的源码去分析一下上面这个例子

模型一需求解决方案

需求

请求网络,有结果返回主线程

重贴一下上面的代码

Observable.just(, , , , , , , , )
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer) throws Exception {
                        //模拟网络请求
                        Thread.sleep();
                        return ;
                    }
                }) .subscribeOn(Schedulers.io())
                 .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        //TODO
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {

                    }
                });
           

源码分析

  • 可观测序列的创建操作
    • just(item)

      :时序图为
    这里的

    onAssembly

    方法解释如下

    new ObservableFromArray<T>(items)

    ,就是

    ObservableFromArray

    这个被观察者中保存

    items

    这个数组
    • map()

      :存储了

      ObservableFromArray

      (map的上游实体)的引用,我们用

      ofa_this

      表示,与

      Function

      对象的引用我们用

      fun1

      表示
    • subscribeOn()

      :存储了其上游被观察者

      ObservableMap

      的引用

      map_this

      和IO调度器
    • observeOn()

      :存储了

      ObservableSubscribeOn

      的上游的引用

      sub_this

      和UI线程调度器
    • 思考:为什么当前对象要存储之前对象所对应的

      Observable

      引用?
    因为后面需要用到这些引用去订阅对应的观察者Observer,如下图
    可能比扔物线写得还好的对RxJava的理解
  • 订阅操作:
    • 思考:上游如何通知下游呢?
      分析模型一可知,因为中间有线程切换操作加数据转换操作,所以数据流必须流经这两个实体。所以如果想让顶层通知到最后的底层的话,必须要经过中间层,让数据流一层一层传递。又根据观察者模式,需要下游的observer订阅上游的observable,才能让数据从上游流向下游。
    • 源码中的解决方案
      可能比扔物线写得还好的对RxJava的理解
      上图步骤1的源码如下,2,3的原理同1:
      public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {
        //subscribe(observer)中的observer即是LambdaObserver
          LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
          subscribe(ls);
          return ls;
      }
       @SchedulerSupport(SchedulerSupport.NONE)
      @Override
      public final void subscribe(Observer<? super T> observer) {
          observer = RxJavaPlugins.onSubscribe(this, observer);
              ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
              subscribeActual(observer);
      }
      //ObservableSubscribeOn.java
      @Override
      protected void subscribeActual(Observer<? super T> observer) {
          if (scheduler instanceof TrampolineScheduler) {
              source.subscribe(observer);
          } else {
              Scheduler.Worker w = scheduler.createWorker();
              //ObserveOnObserver存储了LambdaObserver的实例ls,且在订阅前操作中存储的ObserveOnObserver订阅被观察者者source
              source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
          }
      }
                 
  • 通知操作:RxJava中订阅完成后,因为之前已经订阅过,所以上游可以调用onNext方法直接通知下游
    可能比扔物线写得还好的对RxJava的理解
    在mapObserver.onNext()操作中会执行如下代码:
  • @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != NONE) {
                    actual.onNext(null);
                    return;
                }
    
                U v;
    
                try {
    //回调map中定义的函数,mapper即上文存储的函数
                    v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
                } catch (Throwable ex) {
                    fail(ex);
                    return;
                }
    //利用之前保存的actual实例调用其下游的onNext
                actual.onNext(v);
            }
               
    最后在执行LambdaObserver的onNext中执行消费者函数:
    @Override
        public void onNext(T t) {
            if (!isDisposed()) {
                try {
    //可以看到回调了观察者内部定义的函数
                    onNext.accept(t);
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    get().dispose();
                    onError(e);
                }
            }
        }
               
    • 另一个结合reforit的例子,一图以蔽之,其中

      bodyObservale

      callExecuteObservale

      都是

      reforit

      中的被观察者,省略了网络请求的创建被观察者创操作,代码如下:
    disposable = Network.getGankApi()
                    .getBeauties(, page)
                    .map(new Function<GankBeautyResult, List<Item>>() {
                        @Override
                        public List<Item> apply(GankBeautyResult gankBeautyResult) throws Exception {
                            return null;
                        }
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<List<Item>>() {
                        @Override
                        public void accept(@NonNull List<Item> items) throws Exception {
                            swipeRefreshLayout.setRefreshing(false);
                            pageTv.setText(getString(R.string.page_with_number, MapFragment.this.page));
                            adapter.setItems(items);
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            swipeRefreshLayout.setRefreshing(false);
                            Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
                        }
                    });
               
    对应的解释图如下:
    可能比扔物线写得还好的对RxJava的理解
    模型二解决方案
    需求
    假设有一个数据结构『学生』,要打印出每个学生所需要修的所有课程的名称呢?
    代码
    final List<Student> list = ...
     Disposable disposable = Observable.fromIterable(list)
                    /*将学生的课程发送出去,从学生实例得到课程实例,再发射出去*/
                    .flatMap(new Function<Student, ObservableSource<Student.Course>>() {
      @Override
    public ObservableSource<Student.Course> apply(Student student) throws Exception {
                            //Log.d(TAG,"flatmap student name = "+student.name);
                            return Observable.fromIterable(student.getCourseList());
                        }
                    })
                    /*接受到学生的课程*/
                    .subscribe(new Consumer<Student.Course>() {
                        @Override
                        public void accept(Student.Course course) throws Exception {
                            System.out.printf("Consumer accept course = " + course.getName());
                        }
    
                    });
        }
               
    源码分析
    • 订阅前的操作
      可能比扔物线写得还好的对RxJava的理解
    • 订阅操作
      可能比扔物线写得还好的对RxJava的理解
    • 通知操作
      • fro_this

        通知在订阅操作订阅的观察者

        MergeObserver

        -

        MergeObserver

        收到通知,执行订阅前操作的里面的

        fun2

        ,代码如下,其返回的p是一个

        Observable

        类型,这样就实现了相当于把转换后的数据放入了对应的可观测序列,根据模型二可知,下一步就是要将每个可观测序列中的菱形数据提出来在放入一个

        Observable

        进行输出。这个提出来的过程就是

        subscribeInner

        实现的。可以理解为做了一个

        map

        操作。这个过程逻辑有点复杂,因为涉及到并发控制,所以略过。
      @Override
            public void onNext(T t) {
              // safeguard against misbehaving sources
              if (done) {
                  return;
              }
              ObservableSource<? extends U> p;
              try {
              //执行fun2函数
                  p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
              } catch (Throwable e) {
                  Exceptions.throwIfFatal(e);
                  s.dispose();
                  onError(e);
                  return;
              }
              if (maxConcurrency != Integer.MAX_VALUE) {
                  synchronized (this) {
                      if (wip == maxConcurrency) {
                          sources.offer(p);
                          return;
                      }
                      wip++;
                  }
              }
              //内部订阅
              subscribeInner(p);
                }
                 

    线程控制

    从模型一中我们可以总计如下:
    • subscribeOn()切换子线程是在订阅过程中切换的
    • observerOn()切换成主线程是在通知的过程中
    • 所以上面两个一个操纵订阅过程的线程,一个操纵通知过程的线程,猜测可以产生出任何一种你想要的线程切换功能
    • 举个栗子理解一下:
      Observable
      .map                    // 操作1
      .flatMap                // 操作2
      .subscribeOn(io)//操作3
      .map       //操作4             
      .flatMap   //操作5             
      .observeOn(main)//操作6
      .map             //操作7       
      .flatMap               //操作8
      .subscribeOn(io)        //操作9
      .subscribe(handleData)
                 
    上述操作简单画图如下
    可能比扔物线写得还好的对RxJava的理解
    我们可以发现,第一个操作9的线程切换没有产生效果,所以总结如下:subscribeOn只能调用一次,因为如果有多次,只会有一次有效果,observeOn()可以多次调用实现了你想要的线程的多次切换。

    其他操作符

    当然还有其他操作符,留给后面继续讨论,不过掌握了map和flatmap后,后面的有些操作符应该就不会特别难理解,具体的可见官网