天天看点

RxJava2从入门到精通RxJava是什么及RxJava优势RxJava原理 RxJava使用(创建、装配、消费) RxJava的一些扩展

目录

RxJava是什么及RxJava优势

RxJava原理

RxJava使用(创建、装配、消费)

1、创建

Create

Just

From

Interval & IntervalRange

Range & RangeLong

defer

timer

Empty & Never & Error

2、装配阶段

转换操作符--lift()

转换操作符二

转换操作符三

转换操作符四

转换操作符五

线程调度

筛选操作符

组合Observable操作符

错误处理

异步操作符

公用操作符

数学运算符

聚合操作符

公用操作符三—Do系列

Subject

Processor

关于背压

RxJava的一些扩展

总结

RxJava是什么及RxJava优势

RxJava 并不是一们新语言,RxJava 是一个ReactiveX 在JVM平台上的实现:是一个通过使用可观察序列来编写异步和基于事件的程序的库。

RxJava 最大的好处就是:在异步处理中,它能让线程调度变得非常简单。

其次就是链式调用,它能让一个业务流更清晰,可读性更强。

最后我们把链式调用和线程调度合并起来用,那个编码体现简直不要太爽。

举个例子--找美女的影评

1、异步拉取某电影的评论

2、过滤掉不是女性的评论

3、按用户等级从高到低排序

4、将美女的头像和昵称在列表里显示出来

private void loadData(String url) {
        Observable.just(url)
                .observeOn(Schedulers.io())
                .map(new Function<String, Response>() {
                    @Override
                    public Response apply(String mS) throws Exception {
                        System.out.println(Thread.currentThread().getName() + "---" + mS);
                        Response response = null;

                        URL url = new URL(mS);
                        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                        conn.setRequestMethod("GET");
                        conn.setReadTimeout(5000);
                        if (conn.getResponseCode() == 200) {

                            InputStream in = conn.getInputStream();
                            byte[] b = new byte[1024 * 512];
                            ByteArrayOutputStream baos = new ByteArrayOutputStream();
                            int len = 0;
                            while ((len = in.read(b)) > -1) {
                                baos.write(b, 0, len);
                            }
                            String msg = baos.toString();
                            response = new Gson().fromJson(msg, Response.class);
                        }
                        System.out.println(Thread.currentThread().getName() + "-response--" + response);
                        return response;
                    }
                })
                .flatMap(new Function<Response, ObservableSource<User>>() {
                    @Override
                    public ObservableSource<User> apply(Response mResponse) throws Exception {
                        return Observable.fromIterable(mResponse.getCmts());
                    }
                })
                .filter(new Predicate<User>() {
                    @Override
                    public boolean test(User mUser) throws Exception {
                        return mUser.gender == 2;
                    }
                })
                .toSortedList(new Comparator<User>() {
                    @Override
                    public int compare(User o1, User o2) {
                        return o2.getUserLevel() - o1.getUserLevel();
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe(new SingleObserver<List<User>>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onSuccess(List<User> mUsers) {
                        System.out.println(Thread.currentThread().getName() + "---" + mUsers);
                        mUserAdapter.setNewData(mUsers);
                    }

                    @Override
                    public void onError(Throwable e) {
                    }
                });
    }
           

举个例子--发布一个图文贴子

1、文字检查(假设也是耗时的)

2、先裁剪压缩图片,

3、向云服务申请上传授权

4、上传到云服务,拿到云地址

5、云地址加文本一起走发布接口

RxJava2从入门到精通RxJava是什么及RxJava优势RxJava原理 RxJava使用(创建、装配、消费) RxJava的一些扩展

1、2、3并行执行,2和3都完成后执行4,1和4都完成后执行5

如果按照我们平常的套路去写这里的逻辑你会发现你要做很多的判断,并且调用链也比较复杂。看一下RxJava怎么去实现。

public static void main(String... arg) {
        upload("localPath", "发一个简单的贴子");
        try {
            Thread.sleep(15 * 1000);
        } catch (InterruptedException mE) {
            mE.printStackTrace();
        }
    }

    private static void upload(String localPath, String content) {
        Observable.zip(checkContent(content), uploadImage(localPath), new BiFunction<String, String, String>() {
            @Override
            public String apply(String mS, String mS2) throws Exception {
                System.out.println("现在开始分布--" + Thread.currentThread().getName());
                Thread.sleep(2000);
                return "发布成功";
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String mS) {
                        System.out.println(mS + "--" + Thread.currentThread().getName());
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("-Throwable-" + e.getMessage() + "--" + Thread.currentThread().getName());
                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }


    private static Observable<String> uploadImage(String localPath) {
        return Observable.zip(Observable.just(localPath).observeOn(Schedulers.io()).map(new Function<String, byte[]>() {
            @Override
            public byte[] apply(String path) throws Exception {
                //裁剪压缩后返回byte
                System.out.println("裁剪压缩图片--" + Thread.currentThread().getName());
                Thread.sleep(3000);
                System.out.println("裁剪压缩图片成功--" + Thread.currentThread().getName());
                return new byte[0];
            }
        }), Observable.just("token").observeOn(Schedulers.io()).map(new Function<String, String>() {
            @Override
            public String apply(String mS) throws Exception {
                System.out.println("拿云端地址--" + Thread.currentThread().getName());
                Thread.sleep(1000);
                System.out.println("拿云端地址成功--" + Thread.currentThread().getName());
                return "http://www.baidu.com";
            }
        }), new BiFunction<byte[], String, String>() {
            @Override
            public String apply(byte[] mBytes, String mS) throws Exception {
                System.out.println("压缩成功,拿云端地址都成功,现在上传--" + Thread.currentThread().getName());
                Thread.sleep(3000);
                System.out.println("现在上传成功,返回云端地址--" + Thread.currentThread().getName());
                return mS;
            }
        });
    }


    private static Observable<String> checkContent(String content) {
        return Observable.just(content).flatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String mS) throws Exception {
                System.out.println("文案检查开始--" + Thread.currentThread().getName());
                Thread.sleep(1000);
                if (null == mS || mS.length() <= 0) {
                    System.out.println("文案检查失败--" + Thread.currentThread().getName());
                    throw new NullPointerException("content can not be null !");
                }
                System.out.println("文案检查成功--" + Thread.currentThread().getName());
                return Observable.just(mS);
            }
        });
    }
           

RxJava原理

三个基本的元素

RxJava是基于观察者模式的,首先我们认识一下RxJava 三个基本的元素

被观察者(Observable)

观察者(Observer)

订阅(subscribe)

三者关系:

Observable. Subscribe(Observer)

三个基本事件

onNext() 发送该事件时,观察者会回调 onNext() 方法

onError() 发送该事件时,观察者会回调 onError() 方法,当发送该事件之后,其他事件将不会继续发送

onComplete() 发送该事件时,观察者会回调 onComplete() 方法,当发送该事件之后,其他事件将不会继续发送

另外

onSubscribe(Disposable d); 在注册时返回一个Disposable可以用于反注册

Observable5个基类简述

Observable 0-n的数据流,不支持背压

Flowable  0-n的数据流  支持响应流和背压

Single 一个错误或者一个item流

Completable   只有错误信号和完成信号

Maybe 没有item,恰好一个item或者一个错误

有了这几个基础之后我们开始尝试自己写一个小例子。

public static void create() {
        //被观察者
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                //我是一个发射器,用来发射事件给观察者,
                // onComplete和onError有且只能有一个,都会结束,后面的事件都观察不到
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onComplete();
                if (emitter.isDisposed()) {

                } else {
                    emitter.onError(new Throwable("err"));
                }
            }
        });

        //观察者
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(String mS) {
                System.out.println("onNext-" + mS);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };

        //绑定
        observable.subscribe(observer);
    }
           

我们一步步看一下源码

1、创建。做了一些判断,最后为返回一个ObservableCreate实例

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
           

2、创建一个观察者,就是实现Observer<T>接口

3、绑定。ObservableCreate.subscribe(Observer),最终走ObservableCreate.subscribeActual(Observer);

//订阅过程
 public final void subscribe(Observer<? super T> observer) {
       //做了一堆空判断,勾子的处理等,最后会走这个方法
        subscribeActual(observer);
    }

//我们看一下ObservableCreate的subscribeActual方法
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        //create的入参ObservableOnSubscribe接口,用于回调发射器
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //实例化一个发射器,发射器持有观察者
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        //把发射器通过create传进来的接口回调过去
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
}

           

4、发射事件

//发射事件 :发射器next一个事件的时候会调用观察者的next()方法,观察者就可以接收到事件
 @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
           

经过上边的4步我们已经理解了RxJava是如何实例观察者模式的。

还有一个最最重要的还没有提到,那就是线程调度的问题。我们看到订阅的时候都会被一个

这个我们在讲完转换之后展开。

RxJava使用(创建、装配、消费)

1、创建

前面我们用Observable的create()方法创建了一个被观察者,要传入一个ObservableOnSubscribe接口在订阅之后拿到一个发射器ObservableEmitter再去手动发射数据。

Create

create():创建一个自定义的Obserable,需要自己实现onNext()、onError(),onComplete()的逻辑;

RxJava提供了很多个更简单的创建操作符。

Just

适用于:Flowable,Observable,Maybe,Single

作用:构造一个响应类型通过拿一个预先存在的对象且在订阅时将该对象发射给下游消费者。

它还有许多个重载的版本,区别在于接受的参数的个数不同,最少1个,最多10个

public static void just() {
        Observable<String> just = Observable.just("1111", "2222", "3333");
        just.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String mS) {
                System.out.println(mS);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }
           

From

fromIterable

适用于: Flowable,Observable

作用:遍历集合,逐个发事件

fromArray

适用于: Flowable,Observable

作用:遍历数组,逐个发事件

fromCallable

适用于:Flowable,Observable,Maybe,Single,Completable

作用:传入一个Callable,在订阅时执行call方法,把结果通过DeferredScalarDisposable(订阅时观察者的onSubscribe方法会回            调该实例)的complet()方法回传

fromAction 

适用于: Maybe,Completable

作用:传入一个Action,在订阅时执行run方法。

fromRunnable  

适用于: Maybe,Completable

作用:传入一个Runnable,在订阅时执行run方法。

fromFuture

适用于: Flowable,Observable,Maybe,Single,Completable

作用:接收一个Future对象,会同步等待Future返回的结果再发送数据,也可以设置等待超时时间

Interval & IntervalRange

适用于: Flowable,Observable

作用:周期性生成一个无限的、永远增长的数(长整型). IntervalRange 变体生成一个有限数量的数。

Range & RangeLong

适用于: Flowable,Observable

作用:发送一定范围的事件序列。 Range()函数 生成 Integer ,rangeLong() 生成 Long.

defer

适用于:  Flowable,Observable,Maybe,Single,Completable

作用:直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable。

defer操作符会一直等待直到有观察者订阅它,然后它使用Observable工厂方法生成一个Observable。

timer

适用于: Flowable,Observable,Maybe,Single,Completable

作用:创建一个在给定的时间段之后返回一个特殊值的Observable,它在延迟一段给定的时间后发射一个简单的数字0

Empty & Never & Error

适用于:  Flowable,Observable,Maybe,Completable

empty:这种类型原在订阅时立马发出完成的信号。

never:这种类型原不发出onNext,onSuccess,onError或者onComplete信号。 这种类型的响应源 在测试或者在组合操作符中禁用确切的源非常有用

error:对消费者发出一个错误信号,要么已有,要么通过一个java.util.concurrent.Callable 生成。

2、装配阶段

装配阶段主要用到之前提到的5个基类的操作符,由于操作符较多,我根据文档给大家重点选了几类。

  1. 转换操作符
  2. 组合Observable操作符
  3. 异步和阻塞操作符
  4. 公用操作符
  5. 数学和集合操作符

转换操作符--lift()

lift()

Lift方法是RxJava中所有操作符的基础,可以通过它做各种各样的变化。弄清楚它的原理,也方便我们理解其他操作符

理解起来就两个字----代理。

//由观察String转换到观察Student
    public static void lift(final String name) {
        Observable<String> nameObservable = Observable.just(name);
        Observable<Student> observable = nameObservable.lift(new ObservableOperator<Student, String>() {
            @Override
            public Observer<String> apply(final Observer<? super Student> observer) throws Exception {
                Observer<String> nameObserver = new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {}

                    @Override
                    public void onNext(String mS) {
                        observer.onNext(new Student(name, 18));
                    }

                    @Override
                    public void onError(Throwable e) {}

                    @Override
                    public void onComplete() {}

                };
                return nameObserver;
            }
        });
    }
           
RxJava2从入门到精通RxJava是什么及RxJava优势RxJava原理 RxJava使用(创建、装配、消费) RxJava的一些扩展

1、创建一个观察源为String类型的Observable<String>

2、实现ObservableOperator接口

public interface ObservableOperator<Downstream, Upstream> {
   //传入一个观察者Downstream,这个观察者是我们订阅的观察者,对应的类型是Student
   //返回一个观察者Upstream,这个观察者要我们自己生成,对应的类型是源始的类型String,Upstream持有Downstream
   //Upstream走Next的时候的拿到一个类型是String的事件,我们用这个String生成对应的Student事件再调用Downstream的Next方法
    @NonNull
    Observer<? super Upstream> apply(@NonNull Observer<? super Downstream> observer) throws Exception;
}
           

3、lift()生成一个新的ObservableLift<Student, String>,它持有Observable<String>和ObservableOperator

4、ObservableLift<Student, String>.subscribe(Observer<Student> downstream)

5、ObservableLift<Student, String>.subscribeActual(Observer<Student> downstream)

     Observer<String> upstream= ObservableOperator.apply(downstream)

      source.subscribe(upstream)

6、upstream.next(String)-->downstream.next(new Student(String))

本来被观察者Observable<String>应该被downstream观察的,但是他们两的类型不相同,所以中间实现了另外一个观察者upstream,他和观察源的类型保持了一致。在中间层的观察者upstream拿到事件后,做了一层转换,最后回调给downstream。

明白了转换之后我们就大概猜到线程切换是怎么做的了。

转换操作符二

Map操作符对原始Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable

Cast操作符将原始Observable发射的每一项数据都强制转换为一个指定的类型(多态),然后再发射数据,它是map的一个特殊版本:

适用于一对一转换,当然也可以配合flatmap进行适用

private static void map() {
        Observable.just("小明", "小红").map(new Function<String, Student>() {
            @Override
            public Student apply(String mS) throws Exception {
                return new Student(mS, 18);
            }
        }).subscribe(new Consumer<Student>() {
            @Override
            public void accept(Student mStudent) throws Exception {
                System.out.println(mStudent);
            }
        });
    }
           

转换操作符三

flatMap:将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里。需要注意的是, flatMap并不保证事件的顺序。

concatMap:concatMap和flatMap一样,区别在于能保证顺序

flatMapIterable:将上流的任意一个元素转换成一个Iterable对象。

多用于一对多,多对多。

/**
     * 无序
     */
    private static void test() {
        Observable.range(1,10).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("I am value " + integer);
                }
                return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
    }
           

转换操作符四

buffer

用于将整个流按数量进行分组

private static void buffer() {
        Observable.range(1, 10).buffer(3).subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> mIntegers) throws Exception {
                System.out.println(mIntegers);
            }
        });
    }
结果:
[1, 2, 3]
[4, 5, 6]
[7, 8, 9]
[10]
           

groupBy

将整个流按条件分组

private static void groupBy() {
        Observable.range(1, 5).groupBy(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer mInteger) throws Exception {
                return mInteger % 2;
            }
        }).subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
            @Override
            public void accept(GroupedObservable<Integer, Integer> mIntegerIntegerGroupedObservable) throws Exception {
                if (mIntegerIntegerGroupedObservable.getKey() == 0) {
                    mIntegerIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer mInteger) {
                            System.out.println("双-" + mInteger);
                        }
                    });
                } else {
                    mIntegerIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer mInteger) {
                            System.out.println("单-" + mInteger);
                        }
                    });
                }
            }
        });
    }
结果:
单-1
双-2
单-3
双-4
单-5
           

Scan

对原始Observable发射的每一项数据(第一项除外)都应用一个函数,计算出函数的结果z值,并将该值填 充回可观测序列,等待和下一次发射的数据一起使用,比如做累加,或者找最大最小值

private static void scan() {
        Observable.range(1, 5).scan(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer mInteger, Integer mInteger2) throws Exception {
                System.out.println("mInteger--" + mInteger + "--mInteger2--" + mInteger2);
                return (mInteger > mInteger2 ? mInteger : mInteger2);
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer mInteger) throws Exception {
                System.out.println(mInteger);
            }
        });
    }
结果:
1
mInteger--1--mInteger2--2
2
mInteger--2--mInteger2--3
3
mInteger--3--mInteger2--4
4
mInteger--4--mInteger2--5
5
           

window

window与buffer区别:window是把数据分割成了Observable,buffer是把数据分割成List

private static void window() {
        Observable.range(1, 10).window(3).subscribe(new Consumer<Observable<Integer>>() {
            @Override
            public void accept(Observable<Integer> mIntegerObservable) throws Exception {
                System.out.println(mIntegerObservable);
                mIntegerObservable.subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer mInteger) throws Exception {
                        System.out.println(mInteger);
                    }
                });
            }
        });
    }
结果:
[email protected]
1
2
3
[email protected]
4
5
6
[email protected]
7
8
9
[email protected]
10
           

(Observer<R> old)

转换操作符五

As

可以将一种类型的Observable转换成任意类型

public interface ObservableConverter<T, R> {
    @NonNull
    R apply(@NonNull Observable<T> upstream);
}

Observable.just("18").as(new ObservableConverter<String, Observable<Student>>() {
            @Override
            public Observable<Student> apply(Observable<String> upstream) {
                return Observable.just(new Student("小明", 18));
            }
        }).subscribe(new Consumer<Student>() {
            @Override
            public void accept(Student mStudent) throws Exception {

            }
        });
           

Compose

可以通过它将一种类型的Observable转换成另一种类型的Observable

public interface ObservableTransformer<Upstream, Downstream> {
    @NonNull
    ObservableSource<Downstream> apply(@NonNull Observable<Upstream> upstream);
}      
     
 Observable.just("18").compose(new ObservableTransformer<String, Student>() {
            @Override
            public ObservableSource<Student> apply(Observable<String> upstream) {
                return Observable.just(new Student("小明", 18));
            }
        }).subscribe(new Consumer<Student>() {
            @Override
            public void accept(Student mStudent) throws Exception {

            }
        });
           

那么as,compose与map  flatMap比较?

不同点在于compose()操作符拥有更高层次的抽象概念:它操作于整个数据流中,不仅仅是某一个被发送的事件。具体如下:

compose()是一个能够从数据流中得到原始Observable<T>的操作符,所以,那些需要对整个数据流产生作用的操作(比如,subscribeOn()和observeOn())需要使用compose()来实现。相较而言,如果在flatMap()中使用subscribeOn()或者observeOn(),那么它仅仅对在flatMap()中创建的Observable起作用,而不会对剩下的流产生影响。

当创建Observable流的时候,compose()会立即执行,犹如已经提前写好了一个操作符一样,而flatMap()则是在onNext()被调用后执行,onNext()的每一次调用都会触发flatMap(),也就是说,flatMap()转换每一个事件,而compose()转换的是整个数据流。

因为每一次调用onNext()后,都新建一个Observable,所以flatMap()的效率较低。事实上,compose()操作符只在主干数据流上执行操作。

线程调度

subscribeOn

指定被观察者生产事件的线程,多次调用只有第一次生效

observeOn

指定观察响应事件的线程,多次切换都生效

RxJava内置了多个高度器

1、Schedulers.immediate()  当前线程 = 不指定线程  默认 

2、AndroidSchedulers.mainThread()  Android主线程  操作UI 

3、Schedulers.newThread()  常规新线程  耗时等操作 

4、Schedulers.io()  io操作线程  网络请求、读写文件等io密集型操作 

5、Schedulers.computation()  CPU计算操作线程  大量计算操作

Observable.just(new Student("小明", 18))
                .observeOn(Schedulers.io())
                .subscribeOn(Schedulers.io())
                .map(new Function<Student, String>() {
                    @Override
                    public String apply(Student mStudent) throws Exception {
                        System.out.println(Thread.currentThread().getName());
                        return mStudent.name;
                    }
                })
                .observeOn(Schedulers.newThread())
                .map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(String mS) throws Exception {
                        System.out.println(Thread.currentThread().getName());
                        return 100;
                    }
                })
                .observeOn(Schedulers.io())
                .observeOn(Schedulers.computation())
                .subscribeOn(Schedulers.computation())
                .subscribeOn(Schedulers.newThread())
                .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer mInteger) throws Exception {
                System.out.println(Thread.currentThread().getName());
            }
        });
结果:
RxCachedThreadScheduler-2
RxNewThreadScheduler-2
RxComputationThreadPool-2
           

原理:

前面我们看了lift()操作符的原理,我们简单看一下ObserveOn的源码。

1、ObserveOn会返回一个ObservableObserveOn的新的被观察者

2、订阅时还是和lift()一样的套路,在subscribeActual里生成了一个中间层的观察者ObserveOnObserver。

public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
}
           

筛选操作符

filter

用来根据指定的规则对源进行过滤

private static void fitter() {
        Observable.range(1, 30).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer mInteger) throws Exception {
                return mInteger % 10 == 0;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer mInteger) throws Exception {
                System.out.println(mInteger);
            }
        });
    }
结果:
10
20
30
           

elementAt & firstElement & lastElement

用来获取指定位置的数据

private static void element() {
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i < 50; i++) {
            list.add(i);
        }
        Observable.fromIterable(list).elementAt(18).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer mInteger) throws Exception {
                System.out.println(mInteger);
            }
        });
    }
结果:
18
           

distinct & distinctUntilChanged

前进用来对源中的重复数据进行过滤,后者只当相邻的两个元素相同的时候才会将它们过滤掉

Observable.range(1, 10).map(new Function<Integer, Integer>() {
            @Override
            public Integer apply(Integer mInteger) throws Exception {
                return mInteger % 5;
            }
        }).distinct().subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer mInteger) throws Exception {
                System.out.println(mInteger);
            }
        });
结果:
1
2
3
4
0

 //重复的条件
        Observable.just(new Student("小明", 19), new Student("小明", 10), new Student("小红", 18), new Student("小明", 10)).distinctUntilChanged(new Function<Student, String>() {
            @Override
            public String apply(Student mStudent) throws Exception {
                return mStudent.name;
            }
        }).subscribe(new Consumer<Student>() {
            @Override
            public void accept(Student mStudent) throws Exception {
                System.out.println(mStudent);
            }
        });
结果
Student{name='小明', age=19}
Student{name='小红', age=18}
Student{name='小明', age=10}
           

skip & skipLast & skipUntil & skipWhile

过滤掉数据的前n项,或者一段时间等

Observable.range(1, 10).skip(9).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer mInteger) throws Exception {
                System.out.println(mInteger);
            }
        });
结果:
10
           

take & takeLast & takeUntil & takeWhile

按照某种规则进行选择操作

Observable.range(1, 10).takeWhile(new Predicate<Integer>() {
            @Override
            public boolean test(Integer mInteger) throws Exception {
                return mInteger < 3;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer mInteger) throws Exception {
                System.out.println(mInteger);
            }
        });
结果:
1
2
           

ignoreElements

过滤所有源Observable产生的结果,只会把Observable的onComplete和onError事件通知给订阅者

sample

定期扫描源Observable产生的结果,在指定的间隔周期内进行采样

Observable.interval(100,TimeUnit.MILLISECONDS).sample(300,TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long mLong) throws Exception {
                System.out.println(mLong);
            }
        });
结果:
1
5
7
           

throttleFirst & throttleLast & throttleLatest & throttleWithTimeout

指定的事件范围内发射出来的第一个/最后一个数据

debounce

用来限制发射频率的,它仅在过了一段指定的时间还没发射数据时才发射一个数据

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                // send events with simulated time wait
                emitter.onNext(1); // skip
                Thread.sleep(400);
                emitter.onNext(2); // deliver
                Thread.sleep(505);
                emitter.onNext(3); // skip
                Thread.sleep(100);
                emitter.onNext(4); // deliver
                Thread.sleep(605);
                emitter.onNext(5); // deliver
                Thread.sleep(510);
                emitter.onComplete();
            }
        }).debounce(500, TimeUnit.MILLISECONDS).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer mLong) throws Exception {
                System.out.println(mLong);
            }
        });
结果:
2
4
5
           

组合Observable操作符

´startwith指定的数据源的之前插入几个数据

´merge&mergeArray可以让多个数据源的数据合并起来交叉发射

´concat & concatArray & concatEager将多个Observable拼接起来按照传入的Observable的顺序进行发射,一个Observable没有发射完毕之前不会发射另一个Observable里面的数据

´zip & zipArray & zipIterable将多个Observable数据合并起来它只发射与发射数据项最少的那个Observable一样多的数据

´combineLatest会存储每个Observable的最近的值的,任意一个Observable发射新值时都会触发操作

´join类似于combineLatest操作符,但是join操作符可以控制每个Observable产生结果的生命周期,在每个结果的生命周期内,可以与另一个Observable产生的结果按照一定的规则进行合并

错误处理

onErrorReturn遇到错误时,发送1个特殊事件然后正常终止

onErrorResumeNext遇到错误时,发送1个新的Observable

onExceptionResumeNext遇到异常时,发送1个新的Observable

retry当出现错误时,让被观察者(Observable)重新发射数据

retryUntil

retrywhen遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并决定是否需要重新订阅原始被观察者(Observable)& 发送事件

异步操作符

´fromAction( )— 将一个Action转换成一个调用action且当有Subscriver订阅时发射它的结果的Observable

´fromCallable( )— 将一个Callable转换成调用callable且发射它的结果或者异常的Observable。

´fromRunnable( )— 将一个Runnable转换成Observable,这个Observable执行runable且发射它的结果当有订阅者订阅的时候

´runAsync( ) —返回一个 StoppableObservable ,这个StoppableObservable 发射多个 action 当被一个特定的scheduler的action生成的时候。

公用操作符

´ObserveOn指定Observable在哪个调度器上发送通知给观察者

´SubscribeOn指定Observable自身在哪个调度器上执行

´Timeout如果原始Observable过了指定的一段时长没有发射任何数据,Timeout操作符会以一个onError通知终止这个Observable,或者继续一个备用的Observable。

´Timestamp它将一个发射T类型数据的Observable转换为一个发射类型为Timestamped的数据的Observable,每一项都包含数据的发射时间。

´delay( )— 从一个Obseravble按一定数量向将来转移一些发射对象

´delaySubscription( )— 持有一个Subscriber的订阅请求一段指定的时间在传入它到源Observable之前。

´materialize( )— 把一个Observable转换一系列通知。

´dematerialize( )—是Materialize的逆向过程

´timestamp( )— 为每一个被Observable发出的item附上时间戳

´serialize( )— 强制一个Observable执行序列化调用

´cache( )— 记住被Observable发射的item序列且为未来的订阅者发射相同的序列。

´timeInterval( )— 在源Observable的两个连续的发射之间暂停一段时间

´using( )— 创建一个与Observable相同生命周期的可支配的资源

´single( )— 如果Observable再发射一个item后完成了,那么返回这个item,否则抛出一个异常

´singleOrDefault( ) —如果Observable发射一个item后完成了,那么返回这个item,否则返回默认item

´repeat( )— 创建一个重复发射特殊item或者item序列的 Observable

´repeatWhen( ) —创建一个重复发射特殊item或者item序列的 Observable,取决于第二个Observable的发射

数学运算符

´averageDouble

´averageFloat

´max

´min

´sumDouble

´sumFloat

´sumInt

´sumLong

聚合操作符

´Reduce和Scan类似Scan每次都输出结果,Reduce输出最终结果

´reduceWith返回一个对象,然后发射数据时候都带上这个对象

   可以做收集数据,比如对象是List或者map。最后返回这个对象

´count 用来统计源 Observable 完成的时候一共发射了多少个数据。

´Collect 和reduceWith类似,只是第二个参数BiConsumer没有返回值

´collectInto和reduceWith类似第一个入参不用callback去生成

´toList 对发射的数据加到一个List里最后返回

´toSortedList对发射的数据进行排序后返回

´toMap对发射数据用一个方法返回某个数据的key最后返回map

´toMultimap和toMap类似,区别是value是Collection

公用操作符三—Do系列

注册一个Action,在某种条件下回调,可以在事件流的各个阶段插入一个回调

´doOnEach( )—Observable发射每个item

´doOnNext( )— 就在Observable传入onNext事件顺流而下之前

´doAfterNext( ) —就在Observable传入onNext事件顺流而下之后

´doOnSubscribe( )—Observabler订阅Observable

´doOnUnsubscribe( ) —Observable取消订阅Observable

´doOnCompleted( )— 当Observable成功完成

´doOnError( )— 当Observable带错误完成

´doOnTerminate( )— 在Observable结束之前,不管成功或是出错

´finallyDo( )—当Observable 终止之后会被调用,无论是正常还是异常终止

´doAfterTerminate( )—Observable结束之后,不管成功或是出错

Subject

´Subject在ReactiveX是作为observer和observerable的一个bridge或者proxy。因为它是一个观察者,所以它可以订阅一个或多个可观察对象,同时因为他是一个可观测对象,所以它可以传递和释放它观测到的数据对象,并且能释放新的对象。

´ReplaySubject 可观察到的所有数据(包括订阅前和订阅后的,订阅前的数量可控)

´BehaviorSubject 订阅前最后一个数据和订阅后接收到的所有数据,相当于 ReplaySubject.createWithSize(1)

´PublishSubject可观察订阅后接收到的数据)

´AsyncSubject 仅可观察接收到的最后一个数据

´SerializedSubject在并发情况下使用SerializedSubject,并发时只允许一个线程调用onnext等方法

´UnicastSubject 仅支持订阅一次的Subject

Processor

´Processor 和 Subject 的作用相同的,既是观察者,也是被观察者。Subject 不支持背压,是 RxJava 1.x 继承过来的,Processor 继承 FlowableProcessor,支持背压。

´AsyncProcessor不论何时订阅,都只发射最后一个数据,如果因为异常而终止,不会释放任何数据,但是会向 Observer 传递一个异常通知。

´BehaviorProcessor发射订阅之前的一个数据和订阅之后的全部数据。如果订阅之前没有值,可以使用默认值。

´PublishProcessor从哪里订阅就从哪里发射数据。

´ReplayProcessor无论何时订阅,都发射所有的数据。

´SerializedProcessor其它 Processor 不要在多线程上发射数据,如果确实要在多线程上使用,用这个 Processor 封装,可以保证在一个时刻只在一个线程上执行。

´UnicastProcessor只能有一个观察者。

关于背压

´什么是背压?

´被观察对象发射item比操作符或者订阅者消费更快的情况

´例如:想象一下用zip操作符去合并两个无限的Observable,其中一个发射item的频次是另一个的两倍。一个zip操作符的实现必须维持一个不断扩大的缓冲池来缓存被更快的Observable发射的item ,最终合并被慢的Observable发射的item。这将导致RxJava占有大量的系统资源。

背压的五种策略

´MISSING, onNext事件是在不进行任何缓冲或删除的情况下写入的。相当于没有指定背压策略

´ERROR,  如果下游无法跟上,导致如果缓存池满了,则会发出反向压力异常信号。

´BUFFER,缓冲所有onNext的值,直到下游消耗它。(可能为OOM)

´DROP,    如果下游无法跟上,导致如果缓存池满了,丢弃将要放入缓存池中的数据。

´LATEST   与Drop策略一样,如果缓存池满了,丢弃将要放入缓存池中的数据,不同的是,不管缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中,来保证观察者在接收到完成通知之前,能够接收到Flowable最新发射的一条数据。

解决背压

´增加缓冲区大小

´使用标准运算符批处理/跳过值

´由消费者控制流速

´flowable为运算符托管128个元素的默认缓冲区大小,可通过bufferSize()进行访问,这些元素可通过系统参数rx2.buffer-size全局覆盖。

´当下流消费Buffer的prefetch - (prefetch >> 2)+的时候才会重新缓存新数据(默认128的75%是96个,)

´下流控制:

1、onSubscribe 的参数类型不再是 Disposable,而是 Subscription,可以调用它的 cancel() 切断观察者与被观察者之间的联系。Subscription 还有一个 request(long n) 方法,用来向生产者申请可以消费的事件数量。这样便可以根据本身的消费能力进行消费事件。

2、当调用了 request() 方法后,生产者便发送对应数量的事件供消费者消费。即生产者要求多少,消费者就发多少。

3、如果不显式调用 request 就表示消费能力为 0。request 这个方法若不调用,下游的 onNext 与 OnComplete 都不会调用。

demo里有个针对背压例子,具体请查看下Example_4_Flowable这个类

RxJava的一些扩展

´Retrofit+RxJava+Okhttp

´RxBinding

´RxPermission

´RxBus

´RxLifecycle

总结

下面是一些比较常用的,会用这些基本上就够用了

  1. 创建:just,from,Interval
  2. 线程调度:observerOn,subscribeOn
  3. 转换:map,flatmap,compose
  4. 组合,zip,concat,merge等
  5. 公用:delay,doOn系列等
  6. 错误处理:onErrorReturn,

    onErrorResumeNext,

    onExceptionResumeNext,

    retry,

    retryUntil,

    retrywhen,

  7. Subject

最后

基本提到的操作符都有例子:https://github.com/YangKee/RxJavaDemo