天天看點

Rxjava中的操作符學習筆記1. Create2. Zip3. Concat 4. map()  flatMap()  concactMap()

1. Create

create

 操作符應該是最常見的操作符了,主要用于産生一個 

Obserable

 被觀察者對象

Rxjava中的操作符學習筆記1. Create2. Zip3. Concat 4. map()  flatMap()  concactMap()
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                mRxOperatorsText.append("Observable emit 1" + "\n");
                Log.e(TAG, "Observable emit 1" + "\n");
                e.onNext(1);
                mRxOperatorsText.append("Observable emit 2" + "\n");
                Log.e(TAG, "Observable emit 2" + "\n");
                e.onNext(2);
                mRxOperatorsText.append("Observable emit 3" + "\n");
                Log.e(TAG, "Observable emit 3" + "\n");
                e.onNext(3);
                e.onComplete();
                mRxOperatorsText.append("Observable emit 4" + "\n");
                Log.e(TAG, "Observable emit 4" + "\n" );
                e.onNext(4);
            }
        })
           

2. Zip

zip

 專用于合并事件,該合并不是連接配接(連接配接操作符後面會說),而是兩兩配對,也就意味着,最終配對出的 

Observable

 發射事件數目隻和少的那個相同。

Observable.zip(getStringObservable(), getIntegerObservable(), new BiFunction<String, Integer, String>() {
            @Override
            public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
                return s + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                mRxOperatorsText.append("zip : accept : " + s + "\n");
                Log.e(TAG, "zip : accept : " + s + "\n");
            }
        });

           

3. Concat

對于單一的把兩個發射器連接配接成一個發射器,雖然 

zip

 不能完成,但我們還是可以自力更生,官方提供的 

concat

 讓我們的問題得到了完美解決。

Rxjava中的操作符學習筆記1. Create2. Zip3. Concat 4. map()  flatMap()  concactMap()
Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("concat : "+ integer + "\n");
                        Log.e(TAG, "concat : "+ integer + "\n" );
                    }
                });
           

4. map()  flatMap()  concactMap()

map()

 是一對一的轉化,将事件流中的一個對象轉換為另外一種類型的對象

Rxjava中的操作符學習筆記1. Create2. Zip3. Concat 4. map()  flatMap()  concactMap()

flatMap()是一對多的轉換,且傳回的是一個Observable對象,并且這個 

Observable

 對象并不是被直接發送到了 

Subscriber

 的回調方法中。 

flatMap()

 的原理是這樣的:1. 使用傳入的事件對象建立一個 

Observable

 對象;2. 并不發送這個 

Observable

, 而是将它激活,于是它開始發送事件;3. 每一個建立出來的 

Observable

 發送的事件,都被彙入同一個 

Observable

 ,而這個 

Observable

 負責将這些事件統一交給 

Subscriber

 的回調方法。這三個步驟,把事件拆成了兩級,通過一組新建立的 

Observable

 将初始的對象『鋪平』之後通過統一路徑分發了下去。而這個『鋪平』就是 

flatMap()

 所謂的 flat

如列印一組學生的課程資訊:

Observable.create(new ObservableOnSubscribe<Integer>() {
           @Override
           public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
               e.onNext(1);
               e.onNext(2);
               e.onNext(3);
           }
       }).flatMap(new Function<Integer, ObservableSource<String>>() {
           @Override
           public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
               List<String> list = new ArrayList<>();
               for (int i = 0; i < 3; i++) {
                   list.add("I am value " + integer);
               }
               int delayTime = (int) (1 + Math.random() * 10);
               return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
           }
       }).subscribeOn(Schedulers.newThread())
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(new Consumer<String>() {
                   @Override
                   public void accept(@NonNull String s) throws Exception {
                       Log.e(TAG, "flatMap : accept : " + s + "\n");
                       mRxOperatorsText.append("flatMap : accept : " + s + "\n");
                   }
               });
           

flatMap不保證事件的順序,示意圖:

Rxjava中的操作符學習筆記1. Create2. Zip3. Concat 4. map()  flatMap()  concactMap()

⚠️注意:由于可以在嵌套的 

Observable

 中添加異步代碼, 

flatMap()

 也常用于嵌套的異步操作,例如嵌套的網絡請求。示例代碼(Retrofit + RxJava):

networkClient.token() // 傳回 Observable<String>,在訂閱時請求 token,并在響應後發送 token
    .flatMap(new Func1<String, Observable<Messages>>() {
        @Override
        public Observable<Messages> call(String token) {
            // 傳回 Observable<Messages>,在訂閱時請求消息清單,并在響應後發送請求到的消息清單
            return networkClient.messages();
        }
    })
    .subscribe(new Action1<Messages>() {
        @Override
        public void call(Messages messages) {
            // 處理顯示消息清單
            showMessages(messages);
        }
    });
           

傳統的嵌套請求需要使用嵌套的 Callback 來實作。而通過 

flatMap()

 ,可以把嵌套的請求寫在一條鍊中,進而保持程式邏輯的清晰