天天看點

(2)RxJava2+Retrofit2+OkHttp3系列(RxJava2-2)

今天我們來說一下RxJava2的相關操作符。

操作符

Create

create 操作符應該是最常見的操作符了,主要用于産生一個 Obserable 被觀察者對象。

以後統一把被觀察者 Observable 稱為發射器(上遊事件),觀察者 Observer 稱為接收器(下遊事件)。

(2)RxJava2+Retrofit2+OkHttp3系列(RxJava2-2)
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                mRxOperatorsText.append("Observable emit 1" + "\n");
                Log.e(TAG, "Observable emit 1" + "\n");
                e.onNext();

                Log.e(TAG, "Observable emit 2" + "\n");
                e.onNext();

                Log.e(TAG, "Observable emit 3" + "\n");
                e.onNext();
                e.onComplete();

                Log.e(TAG, "Observable emit 4" + "\n" );
                e.onNext();
            }
        }).subscribe(new Observer<Integer>() {
            private int i;
            private Disposable mDisposable;

            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.e(TAG, "onSubscribe : " + d.isDisposed() + "\n" );
                mDisposable = d;
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.e(TAG, "onNext : value : " + integer + "\n" );
                i++;
                if (i == ) {
                    // 在RxJava 2.x 中,新增的Disposable可以做到切斷的操作,讓Observer觀察者不再接收上遊事件
                    mDisposable.dispose();
                    Log.e(TAG, "onNext : isDisposable : " + mDisposable.isDisposed() + "\n");
                }
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete" + "\n" );
            }
        });
           
(2)RxJava2+Retrofit2+OkHttp3系列(RxJava2-2)

Map

它的作用就是對上遊發送的每一個事件應用一個函數, 使得每一個事件都按照指定的函數去變化。

(2)RxJava2+Retrofit2+OkHttp3系列(RxJava2-2)
(2)RxJava2+Retrofit2+OkHttp3系列(RxJava2-2)
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext();
                emitter.onNext();
                emitter.onNext();
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "This is result " + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });

//結果
This is result  
This is result  
This is result 
           

map 基本作用就是将一個 Observable 通過某種函數關系,轉換為另一種 Observable,上面例子中就是把我們的 Integer 資料變成了 String 類型。

FlatMap

FlatMap将一個發送事件的上遊Observable變換為多個發送事件的Observables,然後将它們發射的事件合并後放進一個單獨的Observable裡,FlatMap 并不能保證事件的順序,如果需要保證,需要用到我們下面要講的 ConcatMap。

(2)RxJava2+Retrofit2+OkHttp3系列(RxJava2-2)
(2)RxJava2+Retrofit2+OkHttp3系列(RxJava2-2)

我們可以使用下面這張圖來了解一下:

(2)RxJava2+Retrofit2+OkHttp3系列(RxJava2-2)

上遊每發送一個事件, FlatMap都将建立一個新的水管, 然後發送轉換之後的新的事件, 下遊接收到的就是這些新的水管發送的資料。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext();
                emitter.onNext();
                emitter.onNext();
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = ; i < ; i++) {
                    list.add("I am value " + integer);
                }
                return Observable.fromIterable(list).delay(,TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });

//結果
I am value 
I am value 
I am value 
I am value 
I am value 
I am value 
I am value 
I am value 
I am value 
           

我們在flatMap中将上遊發來的每個事件轉換為一個新的發送三個String事件的水管, 為了看到flatMap結果是無序的,是以加了10毫秒的延時。

concatMap

它和flatMap的作用幾乎一模一樣, 隻是它的結果是嚴格按照上遊發送的順序來發送的。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext();
                emitter.onNext();
                emitter.onNext();
            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = ; i < ; i++) {
                    list.add("I am value " + integer);
                }
                return Observable.fromIterable(list).delay(,TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });

//結果
I am value    
I am value    
I am value    
I am value    
I am value    
I am value    
I am value    
I am value    
I am value 
           

我們來看一個使用FlatMap的執行個體,使用者注冊成功後立即登入的請求。

//Retrofit2的使用方法,我們後續講解
public interface Api {
    @GET
    Observable<LoginResponse> login(@Body LoginRequest request);

    @GET
    Observable<RegisterResponse> register(@Body RegisterRequest request);
}
           

可以看到登入和注冊傳回的都是一個上遊Observable, 而我們的flatMap操作符的作用就是把一個Observable轉換為另一個Observable。

api.register(new RegisterRequest())//發起注冊請求
                .subscribeOn(Schedulers.io())//在IO線程進行網絡請求
                .observeOn(AndroidSchedulers.mainThread())//回到主線程去處理請求注冊結果
                .doOnNext(new Consumer<RegisterResponse>() {
                    @Override
                    public void accept(RegisterResponse registerResponse) throws Exception {
                        //先根據注冊的響應結果去做一些操作
                    }
                })
                .observeOn(Schedulers.io())//回到IO線程去發起登入請求
                .flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() {
                    @Override
                    public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
                        return api.login(new LoginRequest());
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())//回到主線程去處理請求登入的結果
                .subscribe(new Consumer<LoginResponse>() {
                    @Override
                    public void accept(LoginResponse loginResponse) throws Exception {
                        Toast.makeText(MainActivity.this, "登入成功", Toast.LENGTH_SHORT).show();
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Toast.makeText(MainActivity.this, "登入失敗", Toast.LENGTH_SHORT).show();
                    }
                });
           

Zip

Zip通過一個函數将多個Observable發送的事件結合到一起,然後發送這些組合到一起的事件,它按照嚴格的順序應用這個函數。它隻發射與發射資料項最少的那個Observable一樣多的資料。

(2)RxJava2+Retrofit2+OkHttp3系列(RxJava2-2)

上遊現在有兩根水管,其中一根水管負責發送圓形事件 , 另外一根水管負責發送三角形事件 , 通過Zip操作符, 使得圓形事件 和三角形事件 合并為了一個矩形事件。

(2)RxJava2+Retrofit2+OkHttp3系列(RxJava2-2)
  • 組合的過程是分别從 兩根水管裡各取出一個事件 來進行組合, 并且一個事件隻能被使用一次, 組合的順序是嚴格按照事件發送的順利 來進行的, 也就是說不會出現圓形1 事件和三角形B 事件進行合并, 也不可能出現圓形2 和三角形A 進行合并的情況。
  • 最終下遊收到的事件數量 是和上遊中發送事件最少的那一根水管的事件數量相同,當沒有足夠的事件來組合時,下遊就不會收到剩餘的事件。
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {           
    @Override                                                                                        
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {                     
        Log.d(TAG, "emit 1");                                                                        
        emitter.onNext();                                                                           

        Log.d(TAG, "emit 2");                                                                        
        emitter.onNext();                                                                          

        Log.d(TAG, "emit 3");                                                                        
        emitter.onNext();                                                                          

        Log.d(TAG, "emit 4");                                                                        
        emitter.onNext();                                                                          

        Log.d(TAG, "emit complete1");                                                                
        emitter.onComplete();                                                                        
    }     
    //兩個zip需要在不同的線程中,不然就是observable1先全部發送完成,在發送observable2                                                                                     
}).subscribeOn(Schedulers.io());                                                                                                  

Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {             
    @Override                                                                                        
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {                      
        Log.d(TAG, "emit A");                                                                        
        emitter.onNext("A");                                                                          

        Log.d(TAG, "emit B");                                                                        
        emitter.onNext("B");                                                                         

        Log.d(TAG, "emit C");                                                                        
        emitter.onNext("C");                                                                         

        Log.d(TAG, "emit complete2");                                                                
        emitter.onComplete();                                                                        
    }                                                                                                
}).subscribeOn(Schedulers.io());


Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {                 
    @Override                                                                                        
    public String apply(Integer integer, String s) throws Exception {                                
        return integer + s;                                                                          
    }                                                                                                
}).subscribe(new Observer<String>() {                                                                
    @Override                                                                                        
    public void onSubscribe(Disposable d) {                                                          
        Log.d(TAG, "onSubscribe");                                                                   
    }                                                                                                

    @Override                                                                                        
    public void onNext(String value) {                                                               
        Log.d(TAG, "onNext: " + value);                                                              
    }                                                                                                

    @Override                                                                                        
    public void onError(Throwable e) {                                                               
        Log.d(TAG, "onError");                                                                       
    }                                                                                                

    @Override                                                                                        
    public void onComplete() {                                                                       
        Log.d(TAG, "onComplete");                                                                    
    }                                                                                                
});

//結果
onSubscribe    
emit A         
emit          
onNext: A     
emit B         
emit          
onNext: B     
emit C         
emit          
onNext: C     
emit complete2 
onComplete
           

Zip的實踐場景,比如一個界面需要展示使用者的一些資訊, 而這些資訊分别要從兩個伺服器接口中擷取, 而隻有當兩個都擷取到了之後才能進行展示。

//Retrofit2用法,後續介紹
public interface Api {
    @GET
    Observable<UserBaseInfoResponse> getUserBaseInfo(@Body UserBaseInfoRequest request);

    @GET
    Observable<UserExtraInfoResponse> getUserExtraInfo(@Body UserExtraInfoRequest request);

}
           
Observable<UserBaseInfoResponse> observable1 =                                            
        api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());      

Observable<UserExtraInfoResponse> observable2 =                                           
        api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io());    

//zip打包擷取兩個資訊并展示
Observable.zip(observable1, observable2,                                                  
        new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {         
            @Override                                                                     
            public UserInfo apply(UserBaseInfoResponse baseInfo,                          
                                  UserExtraInfoResponse extraInfo) throws Exception {     
                return new UserInfo(baseInfo, extraInfo);                                 
            }                                                                             
        }).observeOn(AndroidSchedulers.mainThread())                                      
        .subscribe(new Consumer<UserInfo>() {                                             
            @Override                                                                     
            public void accept(UserInfo userInfo) throws Exception {                      
                //do something;                                                           
            }                                                                             
        });
           

今天我們的操作符就說的這裡了,下節還是這些枯燥的操作符啊啊啊!