天天看點

RxJava 變換、 組合 、 合并操作符

/**
 * @author :houde
 *         時間:2018/1/23
 *         Des:RxJava 變換操作符
 */

public class RxOperateActivity extends AppCompatActivity {
    private final String TAG = "RxOperateActivity";
    Observable<Integer> observable1 = Observable.just(1,2,3,4);
    Observable<String>  observable2 = Observable.just("A","B","C");
    private Observer<String> stringObserver = new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG,"開始采用subscribe連接配接");
        }

        @Override
        public void onNext(String s) {
            Log.e(TAG,s);
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG,e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG,"對Complete事件作出響應");
        }
    };
    private Observer<Integer> intObserver = new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG,"開始采用subscribe連接配接");
        }

        @Override
        public void onNext(Integer integer) {
            Log.e(TAG,"事件 = " + integer);

        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG,e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG,"對Complete事件作出響應");
        }
    } ;
    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_image);
        //轉換操作符
        /**
         *  作用
         *      對被觀察者發送的每1個事件都通過指定的函數處理,進而變換成另外一種事件
         *      即:将被觀察者發送的事件轉換為任意的類型事件。
         *  應用場景
         *       資料類型轉換
         *   具體使用
         *       下面以将 使用Map()将事件的參數從整型變換成字元串類型為例子說明
         */
        map();
        /**
         * 作用:
         *  将被觀察者發送的事件序列進行拆分&單獨轉換,再合并成一個新的事件序列,最後再進行發送
         *
         *  原理
         *      1.為事件序列中每個事件都建立一個 Observable 對象;
         *      2.将對每個 原始事件 轉換後的 新事件 都放入到對應 Observable對象;
         *      3.将建立的每個Observable 都合并到一個 建立的、總的Observable 對象;
         *      4.建立的、總的Observable 對象 将 新合并的事件序列 發送給觀察者(Observer)
         *  應用場景
         *      無序的将被觀察者發送的整個事件序列進行變換
         */
        flatMap();
        /**
         * 作用:類似FlatMap()操作符
         * 與FlatMap()的 差別在于:拆分 & 重新合并生成的事件序列 的順序 = 被觀察者舊序列生産的順序
         * 應用場景
         *      有序的将被觀察者發送的整個事件序列進行變換
         */
        concatMap();
        /**
         * 作用
         *  定期從 被觀察者(Observable)需要發送的事件中擷取一定數量的事件&放到緩存區中,
         *  最終發送
         *
         * 應用場景
         *  緩存被觀察者發送的事件
         */
        buffer();
        //組合和并操作符
        /**
         * 作用
         *      組合多個被觀察者一起發送資料,合并後 按發送順序串行執行
         *      二者差別:
         *      組合被觀察者的數量,即concat()組合被觀察者數量≤4個,
         *      而concatArray()則可>4個
         */
        concat();
        concatArray();
        /**
         * 作用
         *      組合多個被觀察者一起發送資料,合并後 按時間線并行執行
         *
         * 二者差別:
         * 組合被觀察者的數量,即merge()組合被觀察者數量≤4個,
         * 而mergeArray()則可>4個
         *
         * 差別上述concat()操作符:
         * 同樣是組合多個被觀察者一起發送資料,但concat()操作符合并後是按發送順序串行執行
         */
        merge();
        mergeArray();
        /**
         * 背景:
         *      使用merge和concat操作符時,
         * 沖突:
         *      若其中一個被觀察者發出onError事件,則會終止其他被觀察者繼續發送事件
         * 解決方案:
         *      若希望onError事件推遲到其他被觀察者發送完事件之後再觸發
         *      即需要使用對應的mergeDelayError()或concatDelayError操作符
         *
         */
        concatDelayError();
        mergeDelayError();
        //事件的合并
        /**
         * 作用
         *      合并多個被觀察者(Observable)發送的事件,
         *      生成一個新的事件序列(即組合過後的事件序列),并最終發送
         * 特别注意:
         *      事件組合方式 = 嚴格按照原先事件序列 進行對位合并
         *      最終合并的事件數量 = 多個被觀察者(Observable)中數量最少的數量
         * 特别注意:
         *      盡管被觀察者2的事件D沒有事件與其合并,但還是會繼續發送
         *      若在被觀察者1 & 被觀察者2的事件序列最後發送onComplete()事件,
         *      則被觀察者2的事件D也不會發送,測試結果如下
         * 定義:
         *      屬于Rxjava中的組合
         * 作用:
         *      1.合并多個被觀察者(Observable)發送的事件
         *      2.生成一個新的事件序列(即合并之後的序列),并最終發送
         * 原理:
         *      1.事件組合方式 = 嚴格按照原先事件序列進行對位合并
         *      2.最終合并的事件數量 = 多個被觀察者(Observable)中數量最少的數量
         *
         * 應用場景:
         *      1.當展示的資訊需要從多個地方擷取(即 資訊 = 資訊1 + 資訊2)& 統一結合後再展示
         *      2.如:合并網絡請求的發送 & 統一展示結果
         */
        zip();
        /**
         * 作用
         *      當兩個Observables中的任何一個發送了資料後,将先發送了資料的Observables
         *      的最新(最後)一個資料與另外一個Observable發送的每個資料結合,最終基于該
         *      函數的結果發送資料
         * 與Zip()的差別:
         *      Zip() = 按個數合并,
         *      即1對1合并;CombineLatest() = 按時間合并,即在同一個時間點上合并
         *
         * combineLatestDelayError()
         *      作用類似于concatDelayError() / mergeDelayError() ,即錯誤處理,此處不作過多描述
         */
        combineLatest();
        /**
         * 作用
         *     把被觀察者需要發送的事件聚合成1個事件 & 發送
         *
         *  聚合的邏輯根據需求撰寫,但本質都是前2個資料聚合,然後與後1個資料繼續進行聚合,依次類推
         */
        reduce();
        /**
         *作用
         *    将被觀察者Observable發送的資料事件收集到一個資料結構裡
         */
        collect();
        //發送事件前追加發送事件
        /**
         * 作用
         *     在一個被觀察者發送事件前,追加發送一些資料/一個新的被觀察者
         */
        startWith();
        startWithArray();
        //統計發送事件數量
        /**
         * 作用
         *      統計被觀察者發送事件的數量
         */
        count();

    }

    private void count() {
        observable1.count().subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.e(TAG,"發送事件的次數 = " + aLong);
            }
        });
    }

    private void startWithArray() {
        Observable.just(4,5,6,7)
                .startWith(0)
                .startWithArray(1,2,3)
                .subscribe(intObserver);
    }

    private void startWith() {
        Observable.just(1,2,3,4)
                .startWith(0)
                .subscribe(intObserver);
    }

    private void collect() {
        observable1.collect(
                // 1. 建立資料結構(容器),用于收集被觀察者發送的資料
                new Callable<ArrayList<Integer>>() {
            @Override
            public ArrayList<Integer> call() throws Exception {
                return new ArrayList<>();
            }
                    // 2. 對發送的資料進行收集
        }, new BiConsumer<ArrayList<Integer>, Integer>() {
            @Override
            public void accept(ArrayList<Integer> list, Integer integer) throws Exception {
                // 參數說明:list = 容器,integer = 後者資料
                list.add(integer);
                // 對發送的資料進行收集
            }
        }).subscribe(new Consumer<ArrayList<Integer>>() {
            @Override
            public void accept(ArrayList<Integer> list) throws Exception {
                Log.e(TAG, "本次發送的資料是: " + list);
            }
        });
    }

    private void reduce() {
        observable1.reduce(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer s1, Integer s2) throws Exception {
                Log.e(TAG, "本次計算的資料是: "+s1 +" 乘 "+ s2);
                return s1 * s2;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "最終計算的結果是: " + integer);
            }
        });
    }

    private void combineLatest() {
        Log.e(TAG,"-------------------combineLatest-------------------");

        Observable.combineLatest(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return s + integer;
            }
        }).subscribe(stringObserver);

    }

    private void zip() {
        Log.e(TAG,"-------------------zip-------------------");


        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer + s;
            }
        }).subscribe(stringObserver);
    }

    private void mergeDelayError() {
        Log.e(TAG,"-------------------mergeDelayError-------------------");
        Observable.mergeArrayDelayError(
                Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        // 發送Error事件,因為使用了concatDelayError,是以第2個Observable将會發送事件,等發送完畢後,再發送錯誤事件
                        emitter.onError(new NullPointerException("這裡發送了一個onError()"));
                        emitter.onComplete();
                    }
                }),
                Observable.just(4, 5, 6))
                .subscribe(intObserver);
    }

    private void concatDelayError() {
        Log.e(TAG,"-------------------concatDelayError-------------------");
        Observable.concat(
                Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        // 發送Error事件,因為無使用concatDelayError,是以第2個Observable将不會發送事件
                        emitter.onError(new NullPointerException("這裡發送了一個onError()"));
                        emitter.onComplete();
                    }
                }),
                Observable.just(4, 5, 6))
                .subscribe(intObserver);

    }

    private void mergeArray(){
        Log.e(TAG,"-------------------mergeArray-------------------");
        Observable.mergeArray(Observable.just(1,2,3),
                Observable.just(4,5,6),
                Observable.just(7,8,9),
                Observable.just(10,11,12),
                Observable.just(13,14,15),
                Observable.just(16,17,18)
                ).subscribe(intObserver);
    }
    private void merge(){
        Log.e(TAG,"-------------------merge-------------------");

        Observable.merge(Observable.just(1,2,3,4),
                Observable.just(5,6),
                Observable.just(7,8,9),
                Observable.just(10,11,12,13)
                )
                .subscribe(intObserver);
    }
    private void concatArray(){
        Log.e(TAG,"-------------------concatArray-------------------");
        Observable.concatArray(Observable.just(1,2,3),
                Observable.just(4,5,6),
                Observable.just(9,10),
                Observable.just(11,12,13),
                Observable.just(14,15,16)
        ).subscribe(intObserver);
    }
    private void concat(){
        // concat():組合多個被觀察者(≤4個)一起發送資料
        // 注:串行執行
        Log.e(TAG,"-------------------concat-------------------");

        Observable.concat(Observable.just(1,2,3),
                Observable.just(4,5,6),
                Observable.just(9,10)
                ).subscribe(intObserver);
    }

    private void buffer() {
        Log.e(TAG,"-------------------buffer-------------------");

        Observable.just(1,2,3,4,5,6,7,8)
                // 設定緩存區大小 & 步長
                // 緩存區大小 = 每次從被觀察者中擷取的事件數量
                // 步長 = 每次擷取新事件的數量
                .buffer(3,1)
                .subscribe(new Observer<List<Integer>>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e(TAG,"開始采用subscribe連接配接");
                    }

                    @Override
                    public void onNext(List<Integer> ints) {
                        Log.e(TAG,"緩存區裡的事件個數" + ints.size());
                        for(int i = 0 ,size = ints.size(); i < size;i++){
                            Log.e(TAG,"事件 = " + i);
                        }
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG,e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG,"對Complete事件作出響應");
                    }
                });
    }

    private void concatMap() {
        Log.e(TAG,"-------------------concatMap-------------------");
        Observable.just(4,3,2,1)
                .concatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        return Observable.fromIterable(getEvents(integer));
                    }
                })
                .subscribe(stringObserver);
    }

    private void flatMap() {
        Log.e(TAG,"-------------------flatMap-------------------");
        Observable.just(1,2,3,4)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        return Observable.fromIterable(getEvents(integer));
                    }
                }).subscribe(stringObserver);
    }

    @NonNull
    private List<String> getEvents(Integer integer) {
        List<String> event = new ArrayList<>(3);
        for(int i =  0 ; i < 3 ; i++){
            event.add("我是事件 " + integer + "拆分後的子事件" + i);
        }
        return event;
    }

    private void map() {
        Log.e(TAG,"-------------------map-------------------");
        Observable.just(1,2,3,4).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "使用 Map變換操作符 将事件" + integer +"的參數從 整型"+integer + " 變換成 字元串類型" + integer;
            }
        }).subscribe(stringObserver);
    }
}