天天看點

RxJava 學習筆記

目錄

參考

添加依賴

建立被觀察者、觀察者、訂閱

一、建立操作符

1.1 create()

1.2 just()    最多不超過10個

1.3 From操作符

1.3.1 fromArray(). 

1.3.2 fromCallable().  

1.3.3 fromFuture()

1.3.4 fromIterable().

1.4 defer().

1.5 timer().

1.6 interval().  

1.7 intervalRange() 

1.8 range 和rangeLong

1.9 empty() ,nerver(),error()

二、轉換操作符

2.1  map()  

2.2 flatMap()

2.3 concatMap()

2.4 buffer()

2.5 groupBy()

2.6 scan()

2.7 window()

三、 組合操作符

3.1 concat()

3.2 concatArray()

3.3 merge() 和mergeArray()

3.4 concatArrayDelayError()和mergeArrayDelayError()

3.5 zip()

3.6 combineLatest() 和combineLatestDelayError()

3.7 reduce()

3.8 collect()

3.9 startWith()&startWithArray()

3.10 count()

四、 功能操作符

4.1 delay()

4.2 doOnEach()

4.3 doOnNext()

4.4  doAfterNext()

4.5 doOnComplete()

4.6 doOnError()

4.7 doOnSubscribe()

4.8 doOnDispose()

4.9 doOnLifecycle()

4.10 doOnTerminate() he doAfterTerminate()

4.11 doFinally()

4.12 onErrorReturn()

4.13 onErrorResumeNext()

4.14 onExceptionResumeNext()

4.15 retry()

4.16 retryUntil()

4.17 retryWhen()

4.18 repeat()

4.19 repeatWhen()

4.20 subscribeOn 和observeOn()

五、過濾操作符

5.1 filter()

5.2 ofType()

5.3 skip()和skipLast

5.4 distinct()

5.5 distinctUntilChanged()

5.6 take() 和takeLast()

5.7 debounce()

5. 8 throttleWithTimeout()

5.9 firstElement() 和lastElement

5.10 elementAt() 和elementAtOrError()

六、條件操作符

6.1 all()

6.2 takeWhile()

6.3 skipWhile()

6.4 takeUntil()

6.5 skipUntil()

6.6 sequenceEqual()

6.7 contains()

6.8 isEmpty()

6.9 amb()

6.10 defaultIfEmpty()

參考

1.玉剛說《RxJava 隻看這一篇文章就夠了 (上、中、下)》

https://mp.weixin.qq.com/s/RkGHpVSpngfHDXo4Es-a9w

https://mp.weixin.qq.com/s/elA3Gib57YGWnXOEcFOPUQ

https://mp.weixin.qq.com/s/WaWEtFjmajalISwAkJyuKw

2. Season_zlc 《 給初學者的RxJava2.0教程(一~十) 》

給出作者《簡書》首頁

https://www.jianshu.com/u/c50b715ccaeb

添加依賴

implementation 'io.reactivex.rxjava2:rxjava:2.1.17'
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
           

建立被觀察者、觀察者、訂閱

public class Test {
    public static void main(String[] args)
    {
        //建立被觀察者
        Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onNext(5);
                emitter.onComplete();
            }
        });

        //建立觀察者
        Observer observer=new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };

        //建立連接配接
        observable.subscribe(observer);
    }
}
           

一、建立操作符

1.1 create()

//建立被觀察者
        Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onNext(5);
                emitter.onComplete();
            }
        });
           

1.2 just()    最多不超過10個

//建立被觀察者
        Observable observable=Observable.just(1,2,3,4,5);
           

1.3 From操作符

1.3.1 fromArray(). 

利用這個可以彌補just方法的缺點,可以多餘10個

//建立被觀察者
        Integer[] ints= new Integer[]{new Integer(0),new Integer(1),new Integer(3)};
        Observable observable=Observable.fromArray(ints);
           

1.3.2 fromCallable().  

它隻會傳回一個結果

//建立被觀察者
       Observable observable=Observable.fromCallable(new Callable() {
           @Override
           public Object call() throws Exception {
               return 1;
           }
       });
           

1.3.3 fromFuture()

public class Test {
    public static void main(String[] args)
    {

        final FutureTask<String> futureTask=new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                System.out.println("Callable is running");
                return "result";
            }
        });

        //建立被觀察者  doOnsubscribe隻有在訂閱的時候才發送事件
        Observable observable=Observable.fromFuture(futureTask)
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        futureTask.run();
                    }
                });

        //建立觀察者
        Observer observer=new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };

        //建立連接配接
        observable.subscribe(observer);
    }
}
           

1.3.4 fromIterable().

直接發送一個list清單

List<String> list=new ArrayList<>();
        list.add("hello");
        list.add("world");
        //建立被觀察者
        Observable<String> observable=Observable.fromIterable(list);
           

1.4 defer().

 直到被觀察者被訂閱才建立被觀察者

String string="hello";
        //建立被觀察者  
        final Observable<String> observable=Observable.defer(new Callable<ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> call() throws Exception {
                return Observable.just(string);
            }
        });
           

意思是隻要是沒有被訂閱之前,string的值改了那麼輸出就變了;

1.5 timer().

到達指定時間之後就發送一個OL值給觀察者

Observable.timer(2, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        System.out.println(aLong);
                    }
                });
           

1.6 interval().  

每隔一段時間發送一個事件,從0開始,不斷加1

Observable.interval(2,TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        System.out.println(aLong);
                    }
                });
           

參數如下:時間間隔,時間機關,還有一種,接受事件之前的初始化事件,時間間隔,時間機關

1.7 intervalRange() 

public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
           

1.8 range 和rangeLong

Observable.range(2,5)
               .subscribe(new Consumer<Integer>() {
                   @Override
                   public void accept(Integer integer) throws Exception {
                       System.out.println(integer);
                   }
               });

       Observable.rangeLong(2,4)
               .subscribe(new Consumer<Long>() {
                   @Override
                   public void accept(Long aLong) throws Exception {
                       System.out.println(aLong);
                   }
               });
           

1.9 empty() ,nerver(),error()

empty()直接發送onComplate();

nerver()不發送任何事件

error()直接發送onError()

注意,都會回調onSubscribe()方法

Observable.empty().subscribe(new Observer<Object>() {
           @Override
           public void onSubscribe(Disposable d) {
               System.out.println("onSubscribe");
           }

           @Override
           public void onNext(Object o) {
             
           }

           @Override
           public void onError(Throwable e) {
               System.out.println("onError");
           }

           @Override
           public void onComplete() {
               System.out.println("onComplete");
           }
       });
           

二、轉換操作符

2.1  map()  

将發送的資料類型轉換成其他類型,而且還可以針對每一個發送的事件分别做不同的反應

Observable.just(1,2,4)
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) {
                        return "this is "+integer.toString();
                    }
                }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
           

2.2 flatMap()

類似map()方法,但是功能更為強大,比如說,上遊發送三個資料(1,2,3)經過flatmap操作符,在内部将對應着三個資料做不同的改變,最後傳回一個Observable對象,利用這個對象我們可以将變換的對象發出去,但是是無序的。

最大的不同就是可以傳回一個Observable對象

Observable.just(1,2,3)
                .flatMap(new Function<Integer, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Integer integer) throws Exception {
                        List <String> list=new ArrayList<>();
                        list.add("this is "+integer.toString());
                        list.add("hello every on");
                        list.add("welcome");
                        return Observable.fromIterable(list);
                    }
                }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                System.out.println(o);
            }
        });
           

2.3 concatMap()

這個轉換符和flatmap基本相同,唯一差別就是嚴格按照發送順序接受。

2.4 buffer()

public final Observable<List<T>> buffer(int count, int skip)
           

count代表緩沖區的最大元素數量,skip表示當緩沖區資料發送完畢之後,再次填充緩沖區要在原來位置上跳過幾個元素,比如說1,2,3,4,5 ,count=2,skip=1,那麼當第一次緩沖區為1,2發送完畢之後,再次入緩沖區的元素就是2,3。

Observable.just(1,2,3,4,5)
                .buffer(2,1)
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        System.out.println(integers);
                    }
                });
           
RxJava 學習筆記

2.5 groupBy()

将發送的資料進行分組,每個組都會傳回一個Oberverable

Observable.just(1,2,3,4,5,7,8,9,10)
                .groupBy(new Function<Integer, Object>() {
                    @Override
                    public Object apply(Integer integer) throws Exception {
                        return integer%5;
                    }
                }).subscribe(new Consumer<GroupedObservable<Object, Integer>>() {
            @Override
            public void accept(GroupedObservable<Object, Integer> objectIntegerGroupedObservable) throws Exception {
                System.out.println("this is :"+objectIntegerGroupedObservable.getKey());

                objectIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });
            }
        });
           
RxJava 學習筆記

2.6 scan()

将資料按照一定的邏輯聚合起來

Observable.just(1,3,5,4,5,7)
                .scan(new BiFunction<Integer, Integer, Integer>() {     //結合方式
                    @Override
                    //起始第一個參數integer 1,起始第二個參數 2,integer 将存儲最終值
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        System.out.println("integer: "+integer);
                        System.out.println("integer1: "+integer2);
                        return integer+integer2;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("最終資料:"+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });
           
RxJava 學習筆記

2.7 window()

發送指定數量的事件的時候,就将這些事件分為一組。

Observable.just(1,2,3,4,5)
                .window(2)
                .subscribe(new Observer<Observable<Integer>>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(Observable<Integer> integerObservable) {
                        integerObservable
                                .subscribe(new Observer<Integer>() {
                                    @Override
                                    public void onSubscribe(Disposable d) {
                                        System.out.println("onSubscribe----");
                                    }

                                    @Override
                                    public void onNext(Integer integer) {
                                        System.out.println(integer);
                                    }

                                    @Override
                                    public void onError(Throwable e) {
                                        System.out.println("onError----");
                                    }

                                    @Override
                                    public void onComplete() {
                                        System.out.println("onComplete----");
                                    }
                                });
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });
           
RxJava 學習筆記

三、 組合操作符

3.1 concat()

将多個觀察者組合在一起,按照順序發送事件。最多隻能組合四個觀察者。

Observable.concat(Observable.just(1,2)
                ,Observable.just(3,4)
                ,Observable.just(5,6)
                ,Observable.just(7,8))
                .subscribe(new Consumer<Number>() {
                    @Override
                    public void accept(Number number) throws Exception {
                        System.out.println(number);
                    }
                });
           

3.2 concatArray()

和concat()一樣,但是不限于4個

3.3 merge() 和mergeArray()

和concat與concatArray一樣,但是這是并行發送,之前的是串行發送。

Observable.merge(
                Observable.interval(2,TimeUnit.SECONDS)
                .map(new Function<Long,String>() {
                    @Override
                    public String apply(Long aLong) throws Exception {
                        return "A "+aLong;
                    }
                })
                ,Observable.interval(2,TimeUnit.SECONDS)
                .map(new Function<Long,String>() {
                    @Override
                    public String apply(Long aLong) throws Exception {
                        return "B "+aLong;
                    }
                })
        ).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
           

3.4 concatArrayDelayError()和mergeArrayDelayError()

concatArray和mergeArray都是将多個觀察者組合在一起發送的,當其中一個發送了Error事件,那麼observer将不接受消息,那麼标題所說的兩個方法可以讓所有事件都發送完畢再執行onError()

Observable.concatArray(
                Observable.create(new ObservableOnSubscribe<Object>()
                {
                    @Override
                    public void subscribe(ObservableEmitter<Object> emitter) throws Exception
                    {
                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        emitter.onError(new NumberFormatException());
                    }
                })
                ,Observable.create(new ObservableOnSubscribe<Object>()
                {
                    @Override
                    public void subscribe(ObservableEmitter<Object> emitter) throws Exception
                    {
                        emitter.onNext(4);
                        emitter.onNext(5);
                        emitter.onNext(6);
                    }
                })
        ).subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(Object o) {
                        System.out.println(o);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });
           
RxJava 學習筆記

3.5 zip()

多個被觀察者在一起, 将他們各自發送的資料,依次拿出來進行處理後當作一個事件發出去。發送事件的個數,就短原則

Observable.zip(
                Observable.just(1, 2, 3, 4, 5)
                        .map(new Function<Integer, Object>() {
                            @Override
                            public Object apply(Integer integer) throws Exception {
                                return "A " + integer.toString();
                            }
                        })
                , Observable.just(6, 7, 8, 9)
                        .map(new Function<Integer, Object>() {
                            @Override
                            public Object apply(Integer integer) throws Exception {
                                return "B " + integer.toString();
                            }
                        })
                , new BiFunction<Object, Object, Object>() {  //組合方式
                    @Override
                    public Object apply(Object o, Object o2) throws Exception {
                        return o.toString()+" "+o2.toString();
                    }
                }
        ).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete(){
                System.out.println("onComplete"); 
            }
        });
           
RxJava 學習筆記

3.6 combineLatest() 和combineLatestDelayError()

發送時間錯開之後,總是結合兩個被觀察者最新的事件,隻要有一方沒有産生那麼就不會産生結果

Observable.combineLatest(
                Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)
                        .map(new Function<Long, Object>() {
                            @Override
                            public Object apply(Long aLong) throws Exception {
                                return "A 發送 "+aLong;
                            }
                        })
                , Observable.intervalRange(2, 6, 2, 2, TimeUnit.SECONDS)
                        .map(new Function<Long, Object>() {
                            @Override
                            public Object apply(Long aLong) throws Exception {
                                return "B 發送 "+aLong;
                            }
                        })
                , new BiFunction<Object, Object, Object>() {
                    @Override
                    public Object apply(Object o, Object o2) throws Exception {
                        return "最終結合發送資料: "+o+" "+o2;
                    }
                }
        ).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });
           
RxJava 學習筆記

3.7 reduce()

與scan()操作符類似,也是發送資料以一定的方式結合起來,不同的是scan每次結合都發送一次結果值,而reduce直接發送最終結果

Observable.just(1,3,5,4,5,7)
                .reduce(new BiFunction<Integer, Integer, Integer>() {     //結合方式
                    @Override
                    //起始第一個參數integer 1,起始第二個參數 2,integer 将存儲最終值
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        System.out.println("integer: "+integer);
                        System.out.println("integer1: "+integer2);
                        return integer+integer2;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("最終資料: "+integer);
                    }
                });
           
RxJava 學習筆記

3.8 collect()

将資料收集到資料結構當中

Observable.just(1,2,3,3,4,5,5,6,7)
                //傳回資料結構
                .collect(new Callable<ArrayList<Integer>>() {   
                    @Override
                    public ArrayList<Integer> call() throws Exception {
                        ArrayList<Integer> list=new ArrayList<>();
                        return list;
                    }
                }
                //向資料結構添加資料
                ,new BiConsumer<ArrayList<Integer>,Integer>() {
                    @Override
                    public void accept(ArrayList<Integer> list, Integer integer) throws Exception {
                        list.add(integer);
                    }
                })
                .subscribe(new Consumer<ArrayList<Integer>>() {
                    @Override
                    public void accept(ArrayList<Integer> list) throws Exception {
                        System.out.println(list);
                    }
                });
           
RxJava 學習筆記

3.9 startWith()&startWithArray()

發送事件之前追加事件,前者隻能追加一個事件,後者可以追加多個事件。追加的事件會先發出。

Observable.just(5,5,5,5,5)
                .startWith(1)
                .startWithArray(2,2,2,2,2)   
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });
           

越後聲明的事件将會先輸出

RxJava 學習筆記

3.10 count()

傳回被觀察者發送事件的數量而不發送事件

Observable.just(5,5,5,5,5)
                .startWith(2)
                .count()
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        System.out.println(aLong);
                    }
                });
           
RxJava 學習筆記

四、 功能操作符

4.1 delay()

延遲一段時間發送事件

Observable.just(5,5,5,5,5)
                .delay(2,TimeUnit.SECONDS)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });
           

4.2 doOnEach()

被觀察者每次發送一個事件之前都會回調這個方法,onComplage也會調用

Observable.just(2,4,5,6,1)
                .doOnEach(new Consumer<Notification<Integer>>() {
                    @Override
                    public void accept(Notification<Integer> integerNotification) throws Exception {
                        System.out.println("将要發送: "+integerNotification);
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("接收:"+integer);
                    }
                });
           
RxJava 學習筆記

4.3 doOnNext()

被觀察者每次發送onNext()之前都會回調這個方法

Observable.just(2,4,5,6,1)
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("doOnNext :"+integer);
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext "+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });
           

RxJava 學習筆記

4.4  doAfterNext()

顧名思義,每次發送onNext()之後回調

Observable.just(2,4,5,6,1)
                .doAfterNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("doAfterNext :"+integer);
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext "+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });
           
RxJava 學習筆記

4.5 doOnComplete()

被觀察者每次發送onComplete之前回調

4.6 doOnError()

被觀察者每次發送onError之前回調

4.7 doOnSubscribe()

被觀察者每次發送onSubscribe()之前回調

Observable.just(2,4,5,6,1)
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        System.out.println("doOnsubscribe");
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext "+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });
           
RxJava 學習筆記

4.8 doOnDispose()

當調用Disposable的dispose()之後回調這個方法

Observable.just(2,4,5,6,1)
                .doOnDispose(new Action() {
                    @Override
                    public void run() throws Exception {
                        System.out.println("doOnDispose");
                    }
                })
                .subscribe(new Observer<Integer>() {
                    Disposable disposable;
                    @Override
                    public void onSubscribe(Disposable d) {
                        disposable=d;
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("onNext "+integer);
                        if(integer.equals(new Integer(4)))
                        {
                            disposable.dispose();
                        }
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });
           
RxJava 學習筆記

4.9 doOnLifecycle()

public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose)
           

第一個參數的回調方法在回調onSubscribe()方法之前回調,第二個參數的回調方法在調用Disposable的dispose()方法之後被回調

就是接受資料之前調用,斷開連接配接之後再調用

Observable.just(1,2,3,4,5,6)
                .doOnLifecycle(new Consumer<Disposable>() {
                    @Override     //在回調onSubscribe()之前調用
                    public void accept(Disposable disposable) throws Exception {
                        System.out.println("on accept");
                    }
                    }
                    , new Action() {
                    @Override   //在Disposable的dispose方法之後調用
                    public void run() throws Exception {
                        System.out.println("on Action");
                    }
                })
                .subscribe(new Observer<Integer>() {
                    Disposable disposable;

                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                        disposable=d;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(integer);
                        disposable.dispose();
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });
           
RxJava 學習筆記

4.10 doOnTerminate() he doAfterTerminate()

顧名思義就是分别在結束之前調用和在結束之後調用,這裡的結束就是回調onError或者onComplete方法

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onComplete();
            }
        })
                .doOnTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        System.out.println("Action");
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });
           
RxJava 學習筆記

4.11 doFinally()

所有的事件發送完了之後調用,包括Error和Complete

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onComplete();
            }
        }).doFinally(new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("end");
            }
        })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });
           
RxJava 學習筆記

4.12 onErrorReturn()

當接收到onError事件之後,回調這個方法(不回調onError方法),這個方法的傳回值會回調onNext方法(一般用來監測錯誤原因),并正常結束該事件序列

Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onNext(5);
                emitter.onError(new NullPointerException());
            }
        }).onErrorReturn(new Function<Throwable, Object>() {
            @Override
            public Object apply(Throwable throwable) {
                System.out.println("onErrorReturn: "+throwable);
                return 404;
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });
           
RxJava 學習筆記

4.13 onErrorResumeNext()

當接收到onError事件之後回調這個方法,并且傳回一個新的Observable,可以用來錯誤彌補(并不會回調onError方法)

Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onError(new NullPointerException());
                emitter.onNext(5);
            }
        }).onErrorResumeNext(new Function<Throwable, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Throwable throwable) throws Exception {
                System.out.println("onErrorResumeNext:"+throwable);
                return Observable.just(6,7,8);
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });
           

傳回結果裡面并沒有5這個數字

RxJava 學習筆記

4.14 onExceptionResumeNext()

與 onErrorResumeNext()作用基本一緻,但是這個隻能用來撲捉Exception,不能用來撲捉Error事件

Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onError(new NullPointerException());  //産生異常
                emitter.onNext(5);
            }
        }).onExceptionResumeNext(new Observable<Integer>() {
            @Override
            protected void subscribeActual(Observer observer) {
                observer.onNext(10);
                observer.onComplete();
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });
           
RxJava 學習筆記

4.15 retry()

如果出現錯誤事件就會重新發送所有事件,參數是重發的次數,onError隻是最後回調一次

Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onError(new NullPointerException());
            }
        }).retry(3).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });
           
RxJava 學習筆記

4.16 retryUntil()

出現錯誤事件之後,可以通過這個功能操作符判斷是否繼續發送事件,return true表示否,return fales表示繼續重複發送

Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onError(new NullPointerException());
            }
        }).retryUntil(new BooleanSupplier() {
            @Override
            public boolean getAsBoolean() throws Exception {
                return true;   //禁止
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });
           
RxJava 學習筆記

4.17 retryWhen()

當被觀察者接收到異常或者錯誤事件的時候會回調這個方法,這個方法傳回一個新的被觀察者,如果被觀察者傳回的是Error事件那麼之前的觀察者就不會繼續發送事件,如果被觀察者發送的事正常事件,那麼之前的被觀察者将不斷重複發送事件。

Observable.create(new ObservableOnSubscribe < String > () {
            @Override
            public void subscribe(ObservableEmitter < String > e) throws Exception {
                e.onNext("1111");
                e.onNext("2222");
                e.onNext("3333");
                e.onError(new Exception("404"));
                e.onNext("4444");
            }
        })
                .retryWhen(new Function < Observable < Throwable > , ObservableSource <? >> () {
                    @Override
                    public ObservableSource <? > apply(Observable < Throwable > throwableObservable) throws Exception {
                        return throwableObservable.flatMap(new Function < Throwable, ObservableSource <? >> () {
                            @Override
                            public ObservableSource <? > apply(Throwable throwable) throws Exception {
                                if(!throwable.toString().equals("java.lang.Exception: 404")) {
                                    return Observable.just("忽略這個異常");
                                } else {
                                    return Observable.error(new Throwable("終止"));
                                }
                            }
                        });
                    }
                })
                .subscribe(new Observer < String > () {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }
                    @Override
                    public void onNext(String s) {
                        System.out.println(s);
                    }
                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }
                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });
           
RxJava 學習筆記

4.18 repeat()

重複發送被觀察者的事件,參數為重複發送的次數

Observable.just(1,2,3,4)
                .repeat(2)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });
           
RxJava 學習筆記

4.19 repeatWhen()

增加條件,是否重複發送事件,傳回一個新的被觀察者

和retrywhen()差不多

4.20 subscribeOn 和observeOn()

前者指定被觀察者運作的線程,後者指定觀察者的運作線程

Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                System.out.println("observable:"+Thread.currentThread().getName());
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io())
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                        System.out.println("observer:"+Thread.currentThread().getName());
                    }

                    @Override
                    public void onNext(Object o) {
                        System.out.println(o);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");

                    }
                });
           
RxJava 學習筆記

可以指定的值如下:

排程器 作用
Schedulers.newThread()
           
代表一個正常的新線程
Schedulers.io();
           
代表io操作的線程, 通常用于網絡,讀寫檔案等io密集型的操作
Schedulers.computation();
           
代表CPU計算密集型的操作, 例如需要大量計算的操作
AndroidSchedulers.mainThread();
           
代表Android的主線程

五、過濾操作符

5.1 filter()

自定義一定的邏輯,來過濾被觀察者發送的事件

Observable.just("hel","hello","word","world")
                .filter(new Predicate<String>() {
                    @Override
                    public boolean test(String s) throws Exception {
                        return s.startsWith("h");
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                });
           
RxJava 學習筆記

5.2 ofType()

顧名思義,除了參數所指定的類型,其他的都過濾

Observable.just(1,2,3,4,"hello","world")
                .ofType(String.class)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                });
           
RxJava 學習筆記

5.3 skip()和skipLast

按照順序跳過參數所指定的數量的事件,前者正序後者反序

Observable.just(1,2,3,4,5)
                .skip(3)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });
           
RxJava 學習筆記

5.4 distinct()

過濾重複事件

Observable.just(1,2,3,4,3)
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });
           
RxJava 學習筆記

5.5 distinctUntilChanged()

過濾重複連續事件

5.6 take() 和takeLast()

控制觀察者接收事件的數量,前者正序,後者反序

Observable.just(1,2,3,4,3)
                .take(4)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });
           
RxJava 學習筆記

5.7 debounce()

過濾那些發送間隔時間小于參數指定時間的事件

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                Thread.sleep(800);
                emitter.onNext(2);
            }
        }).debounce(1,TimeUnit.SECONDS)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("onSubscribe");
                    }

                    @Override
                    public void onNext(Integer o) {
                        System.out.println(o);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("onComplete");
                    }
                });
           

5. 8 throttleWithTimeout()

和上面的方法作用一樣

5.9 firstElement() 和lastElement

顧名思義觀察者隻接收第一個事件(最後一個事件)

5.10 elementAt() 和elementAtOrError()

前者可以讓接受者接收事件序列的指定事件,但是如果index超出了序列的範圍的話并不會發出任何異常,隻不過沒有資料被接收,如果想要發出異常用後者。 index 從0開始

Observable.just(1,2,3,4)
                .elementAt(0)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });
           
RxJava 學習筆記

六、條件操作符

6.1 all()

判斷事件序列是否全部滿足某個條件,如果是傳回true

Observable.just(1,2,3,4)
                .all(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer>8;
                    }
                })
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean b) throws Exception {
                        System.out.println(b);
                    }
                });
           
RxJava 學習筆記

6.2 takeWhile()

功能和filter一樣,過濾不滿足條件的事件.

6.3 skipWhile()

和takeWhile()作用相反

6.4 takeUntil()

接受者接收滿足條件的事件之前的事件(包括本事件)

Observable.just(1,2,3,4,2)
                .takeUntil(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer>3;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });
           
RxJava 學習筆記

6.5 skipUntil()

當skipUntil()裡的資料發送完畢才接收被觀察者的資料,但是發送skipUntil()裡的資料的時候,被觀察者也在發送資料,但是沒有被接收,是以隻能接收到後面的資料

Observable.intervalRange(1,5,0,1,TimeUnit.SECONDS)
                .skipUntil(Observable.intervalRange(6,5,3,1,TimeUnit.SECONDS))
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        System.out.println(aLong);
                    }
                });
           

6.6 sequenceEqual()

判斷連個被觀察者發送的事件是否一樣

Observable.sequenceEqual(Observable.just(1,2,3)
                ,Observable.just(1,2,3))
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean aBoolean) throws Exception {
                        System.out.println(aBoolean);
                    }
                });
           
RxJava 學習筆記

6.7 contains()

判斷被觀察者發送的事件序列裡面有沒有某個事件

Observable.just(1,2,3)
                .contains(2)
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean aBoolean) throws Exception {
                        System.out.println(aBoolean);
                    }
                });
           
RxJava 學習筆記

6.8 isEmpty()

判斷事件序列是否為空,不包括onError和onComplete

6.9 amb()

amb() 要傳入一個 Observable 集合,但是隻會發送最先發送事件的 Observable 中的事件,其餘 Observable 将會被丢棄。

List<Observable<Integer>> list=new ArrayList<>();
        list.add(Observable.just(1,2,3,4).delay(3,TimeUnit.SECONDS));
        list.add(Observable.just(5,6,7));

        Observable.amb(list)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });
           
RxJava 學習筆記

6.10 defaultIfEmpty()

如果被觀察者隻發送了一個oncomplete那麼可以利用這個方法發送一個值

Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).defaultIfEmpty(1)
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        System.out.println(o);
                    }
                });
           
RxJava 學習筆記