天天看点

RxJava 变换、 组合 、 合并操作符

/**
 * @author :houde
 *         时间:2018/1/23
 *         Des:RxJava 变换操作符
 */

public class RxOperateActivity extends AppCompatActivity {
    private final String TAG = "RxOperateActivity";
    Observable<Integer> observable1 = Observable.just(1,2,3,4);
    Observable<String>  observable2 = Observable.just("A","B","C");
    private Observer<String> stringObserver = new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG,"开始采用subscribe连接");
        }

        @Override
        public void onNext(String s) {
            Log.e(TAG,s);
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG,e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG,"对Complete事件作出响应");
        }
    };
    private Observer<Integer> intObserver = new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG,"开始采用subscribe连接");
        }

        @Override
        public void onNext(Integer integer) {
            Log.e(TAG,"事件 = " + integer);

        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG,e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG,"对Complete事件作出响应");
        }
    } ;
    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_image);
        //转换操作符
        /**
         *  作用
         *      对被观察者发送的每1个事件都通过指定的函数处理,从而变换成另外一种事件
         *      即:将被观察者发送的事件转换为任意的类型事件。
         *  应用场景
         *       数据类型转换
         *   具体使用
         *       下面以将 使用Map()将事件的参数从整型变换成字符串类型为例子说明
         */
        map();
        /**
         * 作用:
         *  将被观察者发送的事件序列进行拆分&单独转换,再合并成一个新的事件序列,最后再进行发送
         *
         *  原理
         *      1.为事件序列中每个事件都创建一个 Observable 对象;
         *      2.将对每个 原始事件 转换后的 新事件 都放入到对应 Observable对象;
         *      3.将新建的每个Observable 都合并到一个 新建的、总的Observable 对象;
         *      4.新建的、总的Observable 对象 将 新合并的事件序列 发送给观察者(Observer)
         *  应用场景
         *      无序的将被观察者发送的整个事件序列进行变换
         */
        flatMap();
        /**
         * 作用:类似FlatMap()操作符
         * 与FlatMap()的 区别在于:拆分 & 重新合并生成的事件序列 的顺序 = 被观察者旧序列生产的顺序
         * 应用场景
         *      有序的将被观察者发送的整个事件序列进行变换
         */
        concatMap();
        /**
         * 作用
         *  定期从 被观察者(Observable)需要发送的事件中获取一定数量的事件&放到缓存区中,
         *  最终发送
         *
         * 应用场景
         *  缓存被观察者发送的事件
         */
        buffer();
        //组合和并操作符
        /**
         * 作用
         *      组合多个被观察者一起发送数据,合并后 按发送顺序串行执行
         *      二者区别:
         *      组合被观察者的数量,即concat()组合被观察者数量≤4个,
         *      而concatArray()则可>4个
         */
        concat();
        concatArray();
        /**
         * 作用
         *      组合多个被观察者一起发送数据,合并后 按时间线并行执行
         *
         * 二者区别:
         * 组合被观察者的数量,即merge()组合被观察者数量≤4个,
         * 而mergeArray()则可>4个
         *
         * 区别上述concat()操作符:
         * 同样是组合多个被观察者一起发送数据,但concat()操作符合并后是按发送顺序串行执行
         */
        merge();
        mergeArray();
        /**
         * 背景:
         *      使用merge和concat操作符时,
         * 冲突:
         *      若其中一个被观察者发出onError事件,则会终止其他被观察者继续发送事件
         * 解决方案:
         *      若希望onError事件推迟到其他被观察者发送完事件之后再触发
         *      即需要使用对应的mergeDelayError()或concatDelayError操作符
         *
         */
        concatDelayError();
        mergeDelayError();
        //事件的合并
        /**
         * 作用
         *      合并多个被观察者(Observable)发送的事件,
         *      生成一个新的事件序列(即组合过后的事件序列),并最终发送
         * 特别注意:
         *      事件组合方式 = 严格按照原先事件序列 进行对位合并
         *      最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量
         * 特别注意:
         *      尽管被观察者2的事件D没有事件与其合并,但还是会继续发送
         *      若在被观察者1 & 被观察者2的事件序列最后发送onComplete()事件,
         *      则被观察者2的事件D也不会发送,测试结果如下
         * 定义:
         *      属于Rxjava中的组合
         * 作用:
         *      1.合并多个被观察者(Observable)发送的事件
         *      2.生成一个新的事件序列(即合并之后的序列),并最终发送
         * 原理:
         *      1.事件组合方式 = 严格按照原先事件序列进行对位合并
         *      2.最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量
         *
         * 应用场景:
         *      1.当展示的信息需要从多个地方获取(即 信息 = 信息1 + 信息2)& 统一结合后再展示
         *      2.如:合并网络请求的发送 & 统一展示结果
         */
        zip();
        /**
         * 作用
         *      当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables
         *      的最新(最后)一个数据与另外一个Observable发送的每个数据结合,最终基于该
         *      函数的结果发送数据
         * 与Zip()的区别:
         *      Zip() = 按个数合并,
         *      即1对1合并;CombineLatest() = 按时间合并,即在同一个时间点上合并
         *
         * combineLatestDelayError()
         *      作用类似于concatDelayError() / mergeDelayError() ,即错误处理,此处不作过多描述
         */
        combineLatest();
        /**
         * 作用
         *     把被观察者需要发送的事件聚合成1个事件 & 发送
         *
         *  聚合的逻辑根据需求撰写,但本质都是前2个数据聚合,然后与后1个数据继续进行聚合,依次类推
         */
        reduce();
        /**
         *作用
         *    将被观察者Observable发送的数据事件收集到一个数据结构里
         */
        collect();
        //发送事件前追加发送事件
        /**
         * 作用
         *     在一个被观察者发送事件前,追加发送一些数据/一个新的被观察者
         */
        startWith();
        startWithArray();
        //统计发送事件数量
        /**
         * 作用
         *      统计被观察者发送事件的数量
         */
        count();

    }

    private void count() {
        observable1.count().subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.e(TAG,"发送事件的次数 = " + aLong);
            }
        });
    }

    private void startWithArray() {
        Observable.just(4,5,6,7)
                .startWith(0)
                .startWithArray(1,2,3)
                .subscribe(intObserver);
    }

    private void startWith() {
        Observable.just(1,2,3,4)
                .startWith(0)
                .subscribe(intObserver);
    }

    private void collect() {
        observable1.collect(
                // 1. 创建数据结构(容器),用于收集被观察者发送的数据
                new Callable<ArrayList<Integer>>() {
            @Override
            public ArrayList<Integer> call() throws Exception {
                return new ArrayList<>();
            }
                    // 2. 对发送的数据进行收集
        }, new BiConsumer<ArrayList<Integer>, Integer>() {
            @Override
            public void accept(ArrayList<Integer> list, Integer integer) throws Exception {
                // 参数说明:list = 容器,integer = 后者数据
                list.add(integer);
                // 对发送的数据进行收集
            }
        }).subscribe(new Consumer<ArrayList<Integer>>() {
            @Override
            public void accept(ArrayList<Integer> list) throws Exception {
                Log.e(TAG, "本次发送的数据是: " + list);
            }
        });
    }

    private void reduce() {
        observable1.reduce(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer s1, Integer s2) throws Exception {
                Log.e(TAG, "本次计算的数据是: "+s1 +" 乘 "+ s2);
                return s1 * s2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "最终计算的结果是: " + integer);
            }
        });
    }

    private void combineLatest() {
        Log.e(TAG,"-------------------combineLatest-------------------");

        Observable.combineLatest(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return s + integer;
            }
        }).subscribe(stringObserver);

    }

    private void zip() {
        Log.e(TAG,"-------------------zip-------------------");


        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer + s;
            }
        }).subscribe(stringObserver);
    }

    private void mergeDelayError() {
        Log.e(TAG,"-------------------mergeDelayError-------------------");
        Observable.mergeArrayDelayError(
                Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        // 发送Error事件,因为使用了concatDelayError,所以第2个Observable将会发送事件,等发送完毕后,再发送错误事件
                        emitter.onError(new NullPointerException("这里发送了一个onError()"));
                        emitter.onComplete();
                    }
                }),
                Observable.just(4, 5, 6))
                .subscribe(intObserver);
    }

    private void concatDelayError() {
        Log.e(TAG,"-------------------concatDelayError-------------------");
        Observable.concat(
                Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        // 发送Error事件,因为无使用concatDelayError,所以第2个Observable将不会发送事件
                        emitter.onError(new NullPointerException("这里发送了一个onError()"));
                        emitter.onComplete();
                    }
                }),
                Observable.just(4, 5, 6))
                .subscribe(intObserver);

    }

    private void mergeArray(){
        Log.e(TAG,"-------------------mergeArray-------------------");
        Observable.mergeArray(Observable.just(1,2,3),
                Observable.just(4,5,6),
                Observable.just(7,8,9),
                Observable.just(10,11,12),
                Observable.just(13,14,15),
                Observable.just(16,17,18)
                ).subscribe(intObserver);
    }
    private void merge(){
        Log.e(TAG,"-------------------merge-------------------");

        Observable.merge(Observable.just(1,2,3,4),
                Observable.just(5,6),
                Observable.just(7,8,9),
                Observable.just(10,11,12,13)
                )
                .subscribe(intObserver);
    }
    private void concatArray(){
        Log.e(TAG,"-------------------concatArray-------------------");
        Observable.concatArray(Observable.just(1,2,3),
                Observable.just(4,5,6),
                Observable.just(9,10),
                Observable.just(11,12,13),
                Observable.just(14,15,16)
        ).subscribe(intObserver);
    }
    private void concat(){
        // concat():组合多个被观察者(≤4个)一起发送数据
        // 注:串行执行
        Log.e(TAG,"-------------------concat-------------------");

        Observable.concat(Observable.just(1,2,3),
                Observable.just(4,5,6),
                Observable.just(9,10)
                ).subscribe(intObserver);
    }

    private void buffer() {
        Log.e(TAG,"-------------------buffer-------------------");

        Observable.just(1,2,3,4,5,6,7,8)
                // 设置缓存区大小 & 步长
                // 缓存区大小 = 每次从被观察者中获取的事件数量
                // 步长 = 每次获取新事件的数量
                .buffer(3,1)
                .subscribe(new Observer<List<Integer>>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e(TAG,"开始采用subscribe连接");
                    }

                    @Override
                    public void onNext(List<Integer> ints) {
                        Log.e(TAG,"缓存区里的事件个数" + ints.size());
                        for(int i = 0 ,size = ints.size(); i < size;i++){
                            Log.e(TAG,"事件 = " + i);
                        }
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG,e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG,"对Complete事件作出响应");
                    }
                });
    }

    private void concatMap() {
        Log.e(TAG,"-------------------concatMap-------------------");
        Observable.just(4,3,2,1)
                .concatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        return Observable.fromIterable(getEvents(integer));
                    }
                })
                .subscribe(stringObserver);
    }

    private void flatMap() {
        Log.e(TAG,"-------------------flatMap-------------------");
        Observable.just(1,2,3,4)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        return Observable.fromIterable(getEvents(integer));
                    }
                }).subscribe(stringObserver);
    }

    @NonNull
    private List<String> getEvents(Integer integer) {
        List<String> event = new ArrayList<>(3);
        for(int i =  0 ; i < 3 ; i++){
            event.add("我是事件 " + integer + "拆分后的子事件" + i);
        }
        return event;
    }

    private void map() {
        Log.e(TAG,"-------------------map-------------------");
        Observable.just(1,2,3,4).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "使用 Map变换操作符 将事件" + integer +"的参数从 整型"+integer + " 变换成 字符串类型" + integer;
            }
        }).subscribe(stringObserver);
    }
}