天天看點

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

目錄

  • 一、RxJava的基礎使用
    • 1. 定義
    • 2. 作用
    • 3. 特點
    • 4.Rxjava原理介紹
    • 5. 基本使用
      • 5.1.1 使用步驟
      • 5.1.2 步驟詳解
  • 二、RxJava操作符
    • 1、建立操作符 :建立被觀察者(Observable)對象&發送事件
      • 1)Create() 操作符
      • 2)Just() 操作符
      • 3)fromIterable() 操作符
      • 4)timer()操作符
      • 5)fromArray() 操作符
      • 6)interval() 定時器
      • 7)intervalRange() 操作符
      • 8)Range() 操作符
    • 2、轉換操作符:變換被觀察者(Observable)發送的事件。将Observable發送的資料按照一定的規則做一些變換,然後再将變換的資料發射出去。變換的操作符有map,flatMap,concatMap,switchMap,buffer,groupBy等等。
      • 1)map()操作符
      • 2)flatMap()操作符
      • 3)concatMap() 操作符
      • 4)buffer() 操作符
    • 3、 合并操作符:組合多個被觀察者(Observable)&合并需要發送的事件。包含:concatMap(),concat(),merge(),mergeArray(),concateArray(),reduce(),collect(),startWith(),zip(),count()等
      • 1)merge(),concat ()操作符
      • 2)concatDelayError()/mergeDelayError() 操作符
      • 3)zip 操作符
      • 4)combineLatest 操作符
      • 5)reduce ()操作符
      • 6)collect() 操作符
      • 7)startWith()/startWithArray() 操作符
      • 8)count() 操作符
    • 4、功能操作符:輔助被觀察者(Observable) 發送事件時實作一些功能性需求,如錯誤處理,線程排程。
      • 1) subscribe() 操作符
      • 2) delay() 操作符
      • 3) do 系列操作符
      • 4) onErrorReturn() 操作符
      • 5) onExceptionResumeNext()/onErrorResumeNext() 操作符
      • 6) retry() 操作符
      • 7) retryUntil() 操作符
      • 8) retryWhen() 操作符
      • 9) repeat() 操作符
      • 10)repeatWhen() 操作符
      • 11)debounce() 操作符
      • 12)subscribeOn / ObserverOn 操作符
    • 5、過濾操作符:用于将Observable發送的資料進行過濾和選擇。讓Observable傳回我們所需要的資料。過濾操作符有buffer(),filter(),skip(),take(),skipLast(),takeLast(),throttleFirst(),distainctUntilChange()。
      • 1)filter() 操作符
      • 2)distinct() 操作符
      • 3)distinctUntilChanged() 操作符
      • 4)skip(),skipLast()操作符
      • 5)take() 操作符
      • 6)takeLast() 操作符
      • 7)elementAt() 操作符
      • 8)elementAtOrError()操作符
      • 9)ignoreElements() 操作符
      • 10)ofType() 操作符
      • 11)throttleFirst()/throttleLast() 操作符
      • 12)sample()
      • 13)firstElement()/lastElement()
    • 6、條件 / 布爾操作符:通過設定函數,判斷被觀察者(Observable)發送的事件是否符合條件

一、RxJava的基礎使用

1. 定義

RxJava 在 GitHub 的介紹:

RxJava:a library for composing asynchronous and event-based programs using observable sequences for the Java VM
// 翻譯:RxJava 是一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程式的庫
           

總結:RxJava 是一個 基于事件流、實作異步操作的庫

2. 作用

實作異步操作

類似于 Android中的 AsyncTask 、Handler作用

3. 特點

由于 RxJava的使用方式是:基于事件流的鍊式調用,是以使得 RxJava:

  1. 邏輯簡潔
  2. 實作優雅
  3. 使用簡單

    更重要的是,随着程式邏輯的複雜性提高,它依然能夠保持簡潔 & 優雅

4.Rxjava原理介紹

  • Rxjava原理 基于 一種擴充的觀察者模式
  • Rxjava的擴充觀察者模式中有4個角色:
角色 作用 類比
被觀察者(Observable) 産生事件 顧客
觀察者(Observer) 接收事件,并給出響應動作 廚房
訂閱(Subscribe) 連接配接 被觀察者 & 觀察者 服務員
事件(Event) 被觀察者 & 觀察者 溝通的載體 菜式

具體原理

結合 顧客到飯店吃飯 的生活例子了解:

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

即RxJava原理可總結為:被觀察者 (Observable) 通過 訂閱(Subscribe) 按順序發送事件 給觀察者 (Observer), 觀察者(Observer) 按順序接收事件 & 作出對應的響應動作。具體如下圖:

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

5. 基本使用

5.1.1 使用步驟

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

5.1.2 步驟詳解

加入依賴:

compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
           

具體實作:

public class MainActivity extends AppCompatActivity {
		    private static final String TAG = "Rxjava";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);		

	// 步驟1:建立被觀察者 Observable & 生産事件
	// 即 顧客入飯店 - 坐下餐桌 - 點菜
    
    //  1. 建立被觀察者 Observable 對象
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        // 2. 在複寫的subscribe()裡定義需要發送的事件
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // 通過 ObservableEmitter類對象産生事件并通知觀察者
            // ObservableEmitter類介紹
                // a. 定義:事件發射器
                // b. 作用:定義需要發送的事件 & 向觀察者發送事件
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();
        }
    });

	// 步驟2:建立觀察者 Observer 并 定義響應事件行為
	// 即 開廚房 - 确定對應菜式
    
    Observer<Integer> observer = new Observer<Integer>() {
        // 通過複寫對應方法來 響應 被觀察者
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "開始采用subscribe連接配接");
        }
        // 預設最先調用複寫的 onSubscribe()

        @Override
        public void onNext(Integer value) {
            Log.d(TAG, "對Next事件"+ value +"作出響應"  );
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "對Error事件作出響應");
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "對Complete事件作出響應");
        }
    };

    
    // 步驟3:通過訂閱(subscribe)連接配接觀察者和被觀察者
    // 即 顧客找到服務員 - 點菜 - 服務員下單到廚房 - 廚房烹調
    observable.subscribe(observer);
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

基于事件流的鍊式調用方式:

public class MainActivity extends AppCompatActivity {

private static final String TAG = "Rxjava";

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);

// RxJava的流式操作
        Observable.create(new ObservableOnSubscribe<Integer>() {
        // 1. 建立被觀察者 & 生産事件
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            // 2. 通過通過訂閱(subscribe)連接配接觀察者和被觀察者
            // 3. 建立觀察者 & 定義響應事件的行為
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "開始采用subscribe連接配接");
            }
            // 預設最先調用複寫的 onSubscribe()

        @Override
        public void onNext(Integer value) {
            Log.d(TAG, "對Next事件"+ value +"作出響應"  );
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "對Error事件作出響應");
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "對Complete事件作出響應");
        }

    });
}
           

}

二、RxJava操作符

1、建立操作符 :建立被觀察者(Observable)對象&發送事件

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

1)Create() 操作符

/**
        *
        *  =======================基本用法Create====================================
        *
        *  Observable 被觀察者
        *
        *  ObservableOnSubscribe  觀察者與被觀察者的橋接(事件發射器)
        *
        *  ObServer 觀察者
        *
        *   被觀察者  -->  觀察者與被觀察者的橋接  --> 觀察者
        *
        *   被觀察者.create(觀察者與被觀察者的橋接).subscribe(觀察者)
        *
        */

    Observable
            .create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    for (int i = 0; i < 10; i++) {
                        emitter.onNext(String.valueOf(i));
                    }
                    emitter.onComplete();
                }
            })
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String s) {
                    Log.d(TAG, s);
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "complete");
                }
            });
}
           

輸出如下:

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

2)Just() 操作符

/**
     * ====================just 操作符 ====================
     * 
     * 此操作符的作用是将傳入的資料依次發送出去.最多可以傳10個參數
     * 
     * 以下代碼會依次把 1-10的字元串發送出去。執行10此觀察者的onNext方法,最後預設執行onComplete方法
     */
    public static void just() {

    Observable
            .just("1", "2", "3", "4", "5",
                    "6", "7", "8", "9", "10")
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String s) {
                    Log.d(TAG + "just", s);
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "just", "complete");
                }
            });
}
           

輸出如下:

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

3)fromIterable() 操作符

/**
 * ====================fromIterable 操作符 ====================
 * 
 * 此操作符的作用是将傳入的數組集合按腳标依次發送出去
 * 
 * 以下代碼會依次把 0-9的字元串發送出去。執行10此觀察者的onNext方法,最後預設執行onComplete方法
 */

public static void fromIterable() {

    List<String> list = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        list.add(String.valueOf(i));
    }

    Observable
            .fromIterable(list)
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String s) {
                    Log.d(TAG + "fromIterable", s);
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "fromIterable", "complete");
                }
            });
}
           

輸出如下:

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

4)timer()操作符

/**
     * ==========================timer操作符 ==============================
     * 
     * 延遲指定時間發送一個0數值(Long類型)
     * 
     * timer操作符主要運作在一個新線程中,也可以自定義線程排程器(第三個參數)
     */
    public static void timer() {

    Observable
            .timer(2, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.d(TAG + "timer", String.valueOf(aLong));
                }
            });

}
           

輸出如下:

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

5)fromArray() 操作符

/**
     * ====================fromArray 操作符============================
     * 
     * 對一個數組集合進行觀察,把數組一次性發給觀察者,隻會執行一次觀察者的onNext,最後預設執行onComplete方法
     */
    public static void fromArray() {

    List<String> list = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        list.add(String.valueOf(i));
    }
    Observable
            .fromArray(list)
            .subscribe(new Observer<List<String>>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(List<String> strings) {
                    Log.d(TAG + "fromArray", strings.toString());
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "fromArray", "complete");
                }
            });
}
           

輸出如下:

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

6)interval() 定時器

/**
     * ====================interval  定時器====================
     * 
     * 這個相當于定時器,用它可以取代CountDownTimer。它會按照設定的間隔時間,每次發送一個事件,發送的事件序列:預設從0開始,無限遞增的整數序列
     * 
     * 以下代碼輸出:   0 ----(5秒後)-----1-----(5秒後)------2---------(5秒後)--------3-------(5秒後)-----.......
     */
    public static void interval() {

    Observable
            .interval(5, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.d(TAG + "interval", String.valueOf(aLong));//從0開始輸出
                }
            });
}
           

輸出如下:

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

7)intervalRange() 操作符

/**
     * intervalRange  操作符
     * <p>
     * 作用和interval相同,但可以指定發送資料的數量
     */
    public static void intervalRange() {


    /**
     *  參數1: 起始發送值
     *  參數2:發送數量
     *  參數3:首次發送延遲事件
     *  參數4:每次發送事件間隔
     *  參數5:時間機關
     *
     */
    Observable
            .intervalRange(2, 10, 3, 1, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.d(TAG + "intervalRan", String.valueOf(aLong));//從2開始輸出
                }
            });
}
           

輸出如下:

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

8)Range() 操作符

/**
     * Range  操作符
     * <p>
     * 作用發送指定範圍的序列,可指定範圍.作用類似intervalRange,但不同的是range是無延遲發送
     */
    public static void range() {

    Observable
            .range(2, 6)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG + "interval", String.valueOf(integer));//從2開始輸出
                }
            });
}
           

輸出如下:

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

2、轉換操作符:變換被觀察者(Observable)發送的事件。将Observable發送的資料按照一定的規則做一些變換,然後再将變換的資料發射出去。變換的操作符有map,flatMap,concatMap,switchMap,buffer,groupBy等等。

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

1)map()操作符

/**
     * ======================map============================
     * 
     * map操作符,可以說是的被觀察者轉換器。 通過指定一個Funcation對象,将被觀察者(Observable)轉換成新的被觀察者(Observable)對象并發射,觀察者會收到新的被觀察者并處理
     * 
     * 
     * 本來發射的資料是 數字1,然後觀察者接收到的是 “ 這是新的觀察資料===: 1”
     * 
     * 流程:  被觀察者.create(事件發射器).map(轉換器).subscribe(觀察者)
     */

public static void map() {
    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; i < 10; i++) {
                        emitter.onNext(i);
                    }
                    emitter.onComplete();
                }
            })
            .map(new Function<Integer, String>() {
                @Override
                public String apply(Integer integer) throws Exception {
                    return "這是新的觀察資料===:" + integer;

                }
            })
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d(TAG + "map", s);
                }
            });
}
           

輸出如下:

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

2)flatMap()操作符

/**
     * ======================flatMap============================
     * 
     * flatMap操作符, 将Observable每一次發射的事件都轉換成一個Observable,也就是說把Observable的發射事件集合轉換成Observable集合。
     * 然後觀察者Observer最終觀察的是Observable集合。但是觀察者不能保證接收到這Observable集合發送事件的順序。
     * 
     * 是不是很抽象? 先來看看這一個流程:  觀察者.create(事件發射器).flatMap(轉換器).subscribe(觀察者)
     * 
     * 再來看看例子 : 下面的代碼,一開始Observable通過發射器的onNext發送了0-9這10個事件發送出去,正常來說Observer接收到就是 0 - 9 這10個資料
     * 然而中間經過了flatMap的轉換。這 10個事件都分别在Funcation轉換器上新的Observable。而最終觀察者接收的就是這10個新的Observable的發送事件。
     */

public static void flatMap() {

    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; i < 10; i++) {
                        emitter.onNext(i);
                    }
                    emitter.onComplete();
                }
            })
            .flatMap(new Function<Integer, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    List<String> list = new ArrayList<>();
                    list.add(String.valueOf(0));
                    list.add(String.valueOf(1));
                    return Observable.fromIterable(list).delay(100, TimeUnit.MILLISECONDS);
                }
            })
            .subscribe(
                    new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {

                        }

                        @Override
                        public void onNext(String s) {
                            Log.d(TAG + "flatMap", s);
                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        @Override
                        public void onComplete() {
                            Log.d(TAG + "flatMap", "complete");
                        }
                    });
}
           

輸出如下:

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

3)concatMap() 操作符

/**
     * ======================concatMap============================
     * 
     * 與上面的flatMap作用基本一樣,與flatMap唯一不同的是concat能保證Observer接收到Observable集合發送事件的順序
     */
    public static void concatMap() {

    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; i < 10; i++) {
                        emitter.onNext(i);
                    }
                    emitter.onComplete();
                }
            })
            .concatMap(new Function<Integer, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    List<String> list = new ArrayList<>();
                    list.add(String.valueOf(0));
                    list.add(String.valueOf(1));
                    return Observable.fromIterable(list).delay(100, TimeUnit.MILLISECONDS);
                }
            })
            .subscribe(
                    new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {

                        }

                        @Override
                        public void onNext(String s) {
                            Log.d(TAG + "flatMap", s);
                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        @Override
                        public void onComplete() {
                            Log.d(TAG + "flatMap", "complete");
                        }
                    });
}
           

輸出如下:

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

4)buffer() 操作符

/**
 * ========================buffer 操作符 ======================================
 * 
 * 把發射資料按照一定間隔分成若幹段。按每段的資料轉換成新的Observable,這個Observable把一段資料一次性發射出去。
 * 可以簡單地了解為把一組資料分成若幹小組發射出去,而不是單個單個地發射出去
 */
public static void buffer() {

    Observable
            .just(1, 2, 3, 4, 5, 6)
            .buffer(2)
            .subscribe(new Observer<List<Integer>>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(List<Integer> integers) {
                    for (Integer integer : integers) {
                        Log.d(TAG + "buffer", String.valueOf(integer));
                    }
                    Log.d(TAG + "buffer", "============================");
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "buffer", "onComplete");
                }
            });
}
           

輸出如下:

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

3、 合并操作符:組合多個被觀察者(Observable)&合并需要發送的事件。包含:concatMap(),concat(),merge(),mergeArray(),concateArray(),reduce(),collect(),startWith(),zip(),count()等

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

1)merge(),concat ()操作符

/**
     * ========================merge,concat 操作符 ======================================
     * 
     * merge操作符是把多個Observable合并成一個進行發射。merge可能會讓合并到Observable的資料順序發生錯亂(組合被觀察者數量<=4個)(并行無序)
     * mergeArray操作符和merge作用一樣,但不同的是組合被觀察者數量>4個)(并行無序)
     * 
     * concat操作符也是把多個Observable合并成一個進行發射。但concat則保證合并的每個Observable的事件按順序發射出去。(組合被觀察者數量<=4個)(串行有序)
     * concatArray操作符和concat作用一樣,但不同的是組合被觀察者數量>4個)(串行有序)
     */
    public static void merge() {
        Observable observable1 = Observable.just(1, 2, 3);
        Observable observable2 = Observable.just("哈哈", "嘻嘻", "啊啊");

    Observable
            .merge(observable1, observable2).delay(1, TimeUnit.SECONDS)
            .subscribe(new Observer() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Object o) {
                    Log.d(TAG + "merge", o.toString());
                }

                @Override
                public void onError(Throwable e) {
                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "merge", "onComplete");
                }
            });
}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

2)concatDelayError()/mergeDelayError() 操作符

/**
     * ========================concatDelayError()/mergeDelayError() 操作符 ======================================
     * 
     * 這兩個操作符的作用是: 使用concat()和merge()操作符時,若其中一個被觀察者發送onError事件,則會馬上終止其它被觀察者繼續發送事件。是以呐,這時使用concatError()/
     * mergeDelayError()事件可以使onError事件推遲到其它被觀察者發送事件結束後在再觸發
     */
    public static void concatDelayError() {

    Observable
            .concatArrayDelayError(Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onError(new NullPointerException());
                    emitter.onNext(3);
                    emitter.onNext(4);
                }
            }), Observable.just(5, 6))


            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG + "cDelayError", String.valueOf(integer));
                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG + "cDelayError", "onError");
                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "cDelayError", "onComplete");
                }
            });
}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

3)zip 操作符

/**
     * ========================zip 操作符 ======================================
     * 
     * 把多個Observable合并後,并且把這些Observable的資料進行轉換再發射出去。轉換之後的資料數目由最短資料長度的那個Observable決定。發射完最終會自動調用觀察者的onComplete方法()
     * 
     * 如以下代碼: 資料長度為4的observable1和資料長度為3的observable2進行合并轉換後,觀察者隻接收到3個資料
     */

public static void zip() {

    Observable observable1 = Observable.just(1, 2, 3, 4);
    Observable observable2 = Observable.just("哈哈", "嘻嘻", "啊啊");


    Observable
            .zip(observable1, observable2, new BiFunction<Integer, String, String>() {

                @Override
                public String apply(Integer integer, String s) throws Exception {
                    return s + integer;
                }
            })
            .subscribe(new Observer() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Object o) {
                    Log.d(TAG + "zip", o.toString());
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "merge", "onComplete");
                }
            });
}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

4)combineLatest 操作符

/**
     * ========================combineLatest 操作符 ======================================
     * 
     * 當兩個Observable 中的任何一個發送了資料,将先發送了資料的Observable的最新(最後)一個資料和另一個Observable發送的每個資料結合,最終基于該結合的結果發送資料
     * 
     * 與zip()的差別: zip()是按個數合并,即1對1合并;而combineLatest()是基于時間合并,,即在同一時間點上合并
     */

/**
     *
     *  ======================combineLatestDelayError =================================
     *
     *  作用類似于concatDelayError() / mergeDelayError(),用于錯誤處理

public static void combineLatest() {

    Observable
            .combineLatest(Observable.just(1, 2, 3)
                    , Observable.intervalRange(1, 4, 2, 1, TimeUnit.SECONDS)
                    , new BiFunction<Integer, Long, String>() {
                        @Override
                        public String apply(Integer integer, Long aLong) throws Exception {
                            return "合并後的資料為:" + integer + aLong;
                        }
                    })
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String s) {
                    Log.d(TAG + "combineLatest", s);
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "combineLatest", "onComplete");
                }
            });
}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

5)reduce ()操作符

/**
 * ======================reduce  操作符=================================
 * 
 * 把被觀察者需要發送的資料按照指定規則聚合成一個資料發送
 * 
 * 聚合的規則需要我們編寫,内部流程是前兩個資料按照我們的規則合并後,再與後面的資料按規則合并,依次類推。這樣說有點抽象,看下面的例子。
 */
public static void reduce() {

    Observable
            .just(1, 2, 3, 4, 5)
            .reduce(new BiFunction<Integer, Integer, Integer>() {
                @Override
                public Integer apply(Integer integer, Integer integer2) throws Exception {
                    Log.d(TAG + "reduce", "本次合并的過程是:  " + integer + "+" + integer2);
                    return integer + integer2;
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG + "reduce", "最終計算的結果是 :  " + integer);
                }
            });
}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

6)collect() 操作符

/**
 * ========================collect 操作符=================================
 * 
 * 作用是把 Observable(被觀察者)發送的事件收集到一個資料結構中
 */
public static void collect() {

    Observable
            .just(1, 2, 3, 4, 5)
            .collect(new Callable<ArrayList<Integer>>() {
                @Override
                public ArrayList<Integer> call() throws Exception {
                    return new ArrayList<>();
                }
            }, new BiConsumer<ArrayList<Integer>, Integer>() {
                @Override
                public void accept(ArrayList<Integer> integers, Integer integer) throws Exception {
                    integers.add(integer);
                }
            })
            .subscribe(new Consumer<ArrayList<Integer>>() {
                @Override
                public void accept(ArrayList<Integer> integers) throws Exception {
                    Log.d(TAG + "collect", integers.toString());
                }
            });
}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

7)startWith()/startWithArray() 操作符

/**
    * ========================startWith/startWithArray 操作符=================================
    * 
    * 在一個被觀察者發送時間前,追加發送一些資料/一個新的被觀察者
    */
    public static void startWith() {

    Observable.just(7, 8, 9)
            .startWith(6)   //在發送序列去追加單個資料
            .startWithArray(4, 5)  //在發送序列去追加多個資料
            .startWith(Observable.just(1, 2, 3))  //在發送序列去追加單個被觀察者
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG + "startWith", String.valueOf(integer));
                }
            });
}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

8)count() 操作符

/**
    * ========================count 操作符=================================
    * 
    * 統計被觀察者發送事件數量
    */
    public static void count() {
        Observable
                .just(1, 2, 3, 4)
                .count()
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(TAG + "count", "發送事件的數量 : " + aLong);
                    }
                });
    }
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

4、功能操作符:輔助被觀察者(Observable) 發送事件時實作一些功能性需求,如錯誤處理,線程排程。

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

1) subscribe() 操作符

/**
     * ==================subscribe 操作符===========================
     *
     *  連接配接被觀察者和觀察者
     */
    public static void subscribe() {

    //建立被觀察者
    Observable observable = Observable.create(new ObservableOnSubscribe() {
        @Override
        public void subscribe(ObservableEmitter emitter) throws Exception {
            emitter.onNext("事件");
        }
    });

    //建立觀察者
    Observer observer = new Observer() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG + "subscribe", "開始連接配接");
        }

        @Override
        public void onNext(Object o) {
            Log.d(TAG + "subscribe", "收到事件");
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    };

    //通過subscribe 進行 被觀察者(Observable)與觀察者(Observer)的連接配接
    observable.subscribe(observer);

}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

2) delay() 操作符

/**
     * ==================delay 操作符=======================================
     * 
     * 延遲發送事件
     * 
     * delay有多個重載方法:
     * 
     * delay(long delay,TimeUnit unit) :指定延遲時間。 參數一:時間 ; 參數二:時間機關
     * 
     * delay(long delay, TimeUnit unit, Scheduler scheduler)  指定延遲時間&線程排程器。參數一:時間 ; 參數二:時間機關;參數三: 線程排程器
     * 
     * delay(long delay, TimeUnit unit, boolean delayError)  指定延遲時間&線程排程器。參數一:時間 ; 參數二:時間機關;參數三: 是否錯誤延遲
     * 
     * delay(long delay, TimeUnit unit, Scheduler scheduler, boolean delayError)  指定延遲時間&線程排程器&錯誤延遲。參數一:時間 ; 參數二:時間機關;
     * 參數三: 線程排程器; 參數四:是否錯誤延遲(若中間發生錯誤,是否如常執行,執行完在執行onError())
     */
    public static void delay() {

    Observable
            .just(1, 2)
            .delay(10, TimeUnit.SECONDS)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG + "delay", String.valueOf(integer));
                }
            });
}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

3) do 系列操作符

/**
     * ========================do 系列操作符 =======================================
     * 
     * 在事件發送&接收的整個周期過程中進行操作。
     * 
     * 如發送事件前的操作,發送事件後的回調請求
     * 
     * do系列操作符包含以下:
     * 
     * doOnEach() :當Observable每發送一次事件就會調用一次(包含onNext(),onError(),onComplete())
     * doOnNext(): 執行 onNext()前調用
     * doAfterNext(): 執行onNext()後調用
     * doOnComplete():執行onComplete()前調用
     * doOnError():執行 onError()前調用
     * doOnTerminate(): 執行終止(無論正常發送完畢/異常終止)
     * doFinally(): 最後執行
     * doOnSubscribe() :觀察者訂閱是調用
     * doOnUnScbscribe(): 觀察者取消訂閱時調用
     */

public static void dos() {
    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onError(new NullPointerException());
                }
            })
            .doOnEach(new Consumer<Notification<Integer>>() {
                @Override
                public void accept(Notification<Integer> integerNotification) throws Exception {
                    Log.d(TAG + "doOnEach", "doOnEach:  " + String.valueOf(integerNotification.getValue()));
                }
            })
            .doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG + "doOnNext", "doOnNext:  " + String.valueOf(integer));
                }
            })
            .doAfterNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG + "doAfterNext", "doAfterNext:  " + String.valueOf(integer));
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    Log.d(TAG + "doOnComplete", "doOnComplete");
                }
            })
            .doOnError(new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Log.d(TAG + "doOnError", "doOnError");
                }
            })
            .doOnTerminate(new Action() {
                @Override
                public void run() throws Exception {
                    Log.d(TAG + "doOnTerminate", "doOnTerminate");
                }
            })
            .doAfterTerminate(new Action() {
                @Override
                public void run() throws Exception {
                    Log.d(TAG + "doAfterTermi", "doAfterTerminate");
                }
            })
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(Disposable disposable) throws Exception {
                    Log.d(TAG + "doOnSubscribe", "doOnSubscribe");
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    Log.d(TAG + "doFinally", "doFinally");
                }
            })
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "onSubscribe");
                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "收到的資料:  " + integer);
                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError");
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });

}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

4) onErrorReturn() 操作符

/** ====================onErrorReturn() 操作符 ======================
     * 
     * 可以捕獲錯誤。遇到錯誤時,發送一個特殊事件,并且正常終止.注意後面的事件不會再發送
     */
    public static void onErrorReturn() {

    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onError(new Throwable("Throwable"));
                    emitter.onNext(3);

                }
            })
            .onErrorReturn(new Function<Throwable, Integer>() {
                @Override
                public Integer apply(Throwable throwable) throws Exception {
                    Log.e(TAG, "發生了錯誤:  " + throwable.getMessage());
                    return 404;
                }
            })
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, String.valueOf(integer));
                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, e.getMessage());
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });

}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

5) onExceptionResumeNext()/onErrorResumeNext() 操作符

/**
 * ====================onExceptionResumeNext()/onErrorResumeNext() 操作符 ======================
 * 
 * 遇到錯誤時發送一個新的Observable 。并且正常終止.注意原Observable後面的事件不會再發送
 * 
 * 如果捕獲Exception的話使用onExceptionResumeNext() ,捕獲錯誤的用onErrorResumeNext()
 */
public static void onExceptionResumeNext() {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onError(new NullPointerException("NullPointerException"));
            emitter.onNext(3);
        }
    }).onExceptionResumeNext(new Observable<Integer>() {
        @Override
        protected void subscribeActual(Observer<? super Integer> observer) {
            observer.onNext(4);
            observer.onNext(5);

        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, String.valueOf(integer));
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete");
        }
    });
}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

6) retry() 操作符

/**
 * ====================retry() 操作符 ======================
 * 
 * 作用是:出現錯誤時,讓被觀察者重新發送資料
 * 注:若發送錯誤,則一直重新發送
 * 
 * 有幾個重載方法:
 * retry() : 出現錯誤時,讓被觀察者重新發送資料。若錯誤一直發生,則一直重新發送
 * 
 * retry(long time):與retry不同的書,若錯誤一直發生,被觀察者則一直重新發送資料,但這持續重新發送有次數限制
 * 
 * retry(Predicate predicate) :  出現錯誤時,根據指定邏輯(可以捕獲到發生的錯誤)決定是否讓被觀察者重新發送資料
 * 
 * retry(new BiPredicate<Integer, Throwable>):出現錯誤時,根據指定邏輯(可以捕獲重發的次數和發生的錯誤)決定是否讓被觀察者重新發送資料
 * 
 * retry(long time,Predicate predicate) : 出現錯誤時,根據指定邏輯(可以捕獲到發生的錯誤)決定是否讓被觀察者重新發送資料。并且有持續重發的次數限制
 */
public static void retry() {

    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onError(new Throwable("發生錯誤了"));
                    emitter.onNext(3);
                }
            })
            .retry(new BiPredicate<Integer, Throwable>() {
                @Override
                public boolean test(Integer integer, Throwable throwable) throws Exception {

                    // interger 為重試次數 ,throwable 為捕獲到的異常

                    Log.e(TAG + "retry", throwable.getMessage());
                    Log.e(TAG + "integer", "重試次數: " + integer);

                    //return true : 重新發送請求(若持續遇到錯誤,就持續重新發送)
                    //return false :    不重新發送資料 并且調用觀察者的onError()方法結束

                    if (integer > 2)
                        return false;
                    return true;
                }
            })
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Integer integer) {
                    Log.e(TAG + "retry", String.valueOf(integer));
                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError");
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

7) retryUntil() 操作符

/**
     * ===================retryUntil() 操作符============================
     * 
     * 發送事件遇到錯誤,指定規則是否重新發送。retry(Predicate predicate)。
     * 
     *  return true : 不重新發送請求,并且調用觀察者的onError()方法結束
     *  return false : 重新發送資料(若持續遇到錯誤,就持續重新發送)
     */
    public static void retryUntil() {

    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onError(new Throwable("發生錯誤了"));
                    emitter.onNext(3);
                }
            })
            .retryUntil(new BooleanSupplier() {
                @Override
                public boolean getAsBoolean() throws Exception {

                     //return true : 不重新發送請求,并且調用觀察者的onError()方法結束
                    // return false : 重新發送資料(若持續遇到錯誤,就持續重新發送)
                    return false;
                }
            })
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Integer integer) {
                    Log.e(TAG + "retryUntil", String.valueOf(integer));
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "retryUntil", "onComplete");
                }
            });
}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

8) retryWhen() 操作符

/**
     * ===================retryWhen() 操作符============================
     * 
     * 遇到錯誤時,将發生的錯誤傳遞給一個新的被觀察者(Observable),并決定是否需要重新訂閱原始被觀察者(Observable) &  發送事件
     */
    public static void retryWhen() {
        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onError(new Throwable("發送了錯誤"));
                        emitter.onNext(3);
                    }
                })
                //遇到Error時會回調
                .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                        return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                            @Override
                            public ObservableSource<?> apply(Throwable throwable) throws Exception {

                            //1、若傳回的Observable發送的事件 = Error ,則原始的Observable則不重新發送事件。該異常資訊可在觀察者的onError中獲得
                            //return Observable.error(throwable);

                            //2、若傳回的Observable發送的事件= Next事件(和next的内容無關),則原始的Observable重新發送事件(若持續遇到錯誤,則持續發送)
                            return Observable.just(5); //僅僅是作為一個觸發重新訂閱原被觀察者的通知,什麼資料并不重要,隻有不是onComplete/onError事件
                        }
                    });

                }
            })
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG + "retryWhen", String.valueOf(integer));
                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG + "retryWhen", e.getMessage());
                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "retryWhen", "onComplete");
                }
            });
}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

9) repeat() 操作符

/**
     * ===============repeat() 操作符==============
     * 
     * repeat操作符的作用是重複發射 observable的資料序列,可以使無限次也可以是指定次數.不傳時為重複無限次
     */
    public static void repeat() {

    Observable
            .just(1, 2, 3)
            .repeat(3)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG + "repeat", String.valueOf(integer));
                }
            });
}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

10)repeatWhen() 操作符

/**
 *  ===============repeatWhen() 操作符==============
 *
 * 将原始 Observable 停止發送事件的辨別(Complete() / Error())轉換成1個 Object 類型資料傳遞給1個新被觀察者(Observable)
 * ,以此決定是否重新訂閱 & 發送原來的 Observable
 */
public static void repeatWhen() {

    Observable
            .just(1, 2, 4)
            .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
                    return  objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(Object o) throws Exception {

                            //若新被觀察者(Observable)傳回1個Complete()/  Error()事件,則不重新訂閱 & 發送原來的 Observable
                            //Observable.empty() = 發送Complete事件,但不會回調觀察者的onComplete()
                            return Observable.empty();

                            // return Observable.error(new Throwable("不再重新訂閱事件"));
                            // 傳回Error事件 = 回調onError()事件,并接收傳過去的錯誤資訊。

                            // 情況2:若新被觀察者(Observable)傳回其餘事件,則重新訂閱 & 發送原來的 Observable
                            // return Observable.just(1);
                            // 僅僅是作為1個觸發重新訂閱被觀察者的通知,發送的是什麼資料并不重要,隻要不是Complete() /  Error()事件
                        }
                    });
                }
            })
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Integer integer) {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });
}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

11)debounce() 操作符

/**
     * ===============debounce() 操作符==============
     * <p>
     * 一定的時間内沒有操作就會發送事件(隻會發送最後一次操作的事件)。
     * <p>
     * 以下的例子: 發送5個事件,每個事件間隔1秒。但是debounce限定了2秒内沒有任何操作才會真正發送事件。是以隻有最後一次滿足條件,隻能接收到事件 5
     */
    public static void debounce() {

    Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
            .debounce(2, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.d(TAG + "debounce", String.valueOf(aLong));
                }
            });

}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

12)subscribeOn / ObserverOn 操作符

/**
     * ===============subscribeOn() 操作符==============
     * ===============observerOn() 操作符==============
     * <p>
     * <p>
     * subscribeOn : 發送事件的線程
     * observerOn: 接收事件的線程
     * <p>
     * 線程排程器:
     * Schedulers.io(): 代表io操作的線程,通常用于網絡,讀寫檔案等io密集型的操作
     * Schedulers.compucation(): 代表CPU計算密集型的操作,例如需要大量計算的操作
     * Schedulers.newThread(): 代表一個正常的新線程
     * AndroidSchedulers。mainThread(): 代表Android的主線程
     */
    public static void subscribeOn_observerOn() {

    Observable
            .create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("事件");
                    Log.d(TAG + "subscribeOn_ObserverOn", "發送事件:" + Thread.currentThread().getName());
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d(TAG + "subscribeOn_ObserverOn", "接收事件:   " + Thread.currentThread().getName());
                }
            });

}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

5、過濾操作符:用于将Observable發送的資料進行過濾和選擇。讓Observable傳回我們所需要的資料。過濾操作符有buffer(),filter(),skip(),take(),skipLast(),takeLast(),throttleFirst(),distainctUntilChange()。

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

1)filter() 操作符

/**
     * ========================filter() 操作符 ======================================
     * 
     * 對被觀察者發送的事件做過濾操作。隻有符合篩選條件的事件才會被觀察者所接收。
     * 
     * return true : 繼續發送
     * 
     * return false : 不發送
     */
    public static void filter() {

    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; i < 3; i++) {
                        emitter.onNext(i);
                    }
                }
            })
            .filter(new Predicate<Integer>() {
                @Override
                public boolean test(Integer integer) throws Exception {
                    //return true : 繼續發送
                    //return false : 不發送
                    return integer != 2; // 本例子過濾了 等于2的情況
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG + "filter", String.valueOf(integer));
                }
            });

}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

2)distinct() 操作符

/**
     * ========================distinct 操作符 ======================================
     * 
     * 簡單地說就是去重。發射的資料包含重複的,會将重複的篩選掉。也就是,它隻允許還沒有被發射過的資料通過,被觀察者接收。發射完資料會自動調用onComplete()方法
     * 
     * y以下代碼:觀察者隻會接收到 : 1,2,3,5 四個數值
     */
    public static void distinct() {
        Observable
                .just(1, 2, 3, 2, 3, 5)
                .distinct()
                .subscribe(
                        new Observer<Integer>() {
                            @Override
                            public void onSubscribe(Disposable d) {

                        }

                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG + "distinct", String.valueOf(integer));
                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        @Override
                        public void onComplete() {
                            Log.d(TAG + "distinct", "onComplete");
                        }
                    });

}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

3)distinctUntilChanged() 操作符

/**
     * ========================distinctUntilChanged 操作符 ======================================
     * 
     * 過濾掉連續重複的事件
     */
    public static void distinctUntilChanged() {
        Observable.just(1, 2, 3, 1, 2, 3, 3, 4, 4)
                .distinctUntilChanged()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, String.valueOf(integer));
                    }
                });
    }
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

4)skip(),skipLast()操作符

/**
 * ========================skip,skipLast======================================
 *
 *  skip 操作符是把Observable發射的資料過濾點掉前n項。而take操作符隻能接收前n項。另外還有skipLast和takeLast則是從後往前進行過濾.接收完會調用onComplete()方法
 *
 */

/**
 * 以下代碼輸出: 3,4,5
 */
public static void skip() {
    Observable
            .just(1, 2, 3, 4, 5, 6, 7)
            .skip(2)  //過濾前2項
            .skipLast(3) //過濾後3項
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG + "skip", "根據順序過濾" + String.valueOf(integer));
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "skip", "onComplete");
                }
            });
//-------------------------------------------------------------------------------------------------
        Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
                .skip(1, TimeUnit.SECONDS)   //過濾地1s發送的資料
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Long aLong) {
                    Log.d(TAG + "skip", "根據事件過濾" + String.valueOf(aLong));
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });


}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

5)take() 操作符

/**
     * ====================== take 操作符===========================
     * 隻能接收兩個事件
     */
    public static void take() {
        Observable
                .just(1, 2, 3, 4, 5, 6, 7)
                .take(2)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG + "skip", String.valueOf(integer));
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "skip", "onComplete");
                }
            });
}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

6)takeLast() 操作符

/**
     * =======================takeLast() 操作符 ======================
     * 
     * 隻能接收被觀察者發送的最後幾個事件
     */

public static void takeLast() {
    Observable
            .just(1, 2, 3, 4, 5, 6, 7)
            .takeLast(2)
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG + "skip", String.valueOf(integer));
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "skip", "onComplete");
                }
            });
}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

7)elementAt() 操作符

/**
     * ========================elementAt() 操作符 ======================================
     * 
     * 隻發射第n項資料.
     *  一個參數和兩個參數時:
     *  elementAt(第n項)
     * elementAt(第n項,第N項不存在時預設值)
     * 
     *  n為負數時,報IndexOUtOfBoundExection。為正數但超過發射資料長度不會報異常會使用預設值代替
     */

public static void elementAt() {

    Observable.range(1, 5)
            .elementAt(6, 10)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG + "elementAt", String.valueOf(integer));
                }
            });
}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

8)elementAtOrError()操作符

/**
     * ==============elementAtOrError()===================================================
     * 
     * 在elementAtError()的基礎上,當出現越界情況(當擷取位置的索引>事件序列的長度),即抛出異常
     */
    public static void elementAtOrError() {
        Observable.range(1, 5)
                .elementAtOrError(6)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG + "elementAtOrErr", String.valueOf(integer));
                    }
                });
    }
           

9)ignoreElements() 操作符

/**
     * ========================ignoreElements() 操作符 ======================================
     * 
     * 不管發射的資料.隻希望在它完成時和遇到錯誤時收到通知
     */
    public static void ignoreElements() {

    Observable
            .range(0, 10)
            .ignoreElements()
            .subscribe(new Action() {
                @Override
                public void run() throws Exception {
                    Log.d(TAG + "ignoreEles", "完成了");
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Log.d(TAG + "ignoreEles", "出錯了");
                }
            });
}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

10)ofType() 操作符

/**
     * ========================ofType() 操作符 ======================================
     * 
     * 通過資料的類型過濾資料,隻發送指定類型資料。
     * 
     * 以下代碼觀察者隻接收到: 1,2
     */
    public static void ofType() {

    Observable.just("哈哈", 1, "嘻嘻", 2, 3.5)
            .ofType(Integer.class)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG + "ofType", String.valueOf(integer));
                }
            });
}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

11)throttleFirst()/throttleLast() 操作符

/**
     * ========================throttleFirst()/throttleLast() 操作符 ======================================
     * 
     * throttleFirst() 在某段時間内,隻發送該段事件第一次事件
     * 
     * throttleLast()  在某段時間内,隻發送該段事件最後一次事件
     */

public static void throttleFirst() {
    Observable.interval(300, TimeUnit.MILLISECONDS) //每個0.3秒發送一個事件
            .throttleFirst(1, TimeUnit.SECONDS) //隻接收每秒内發送的第一個資料
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.d(TAG + "throttleFirst", String.valueOf(aLong));
                }
            });
}
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

12)sample()

/**
     * ===================sample()=================================
     * 
     * 在某段時間内,隻發送該段時間内最新(最後)1次事件
     * 
     * 與throttleLast類似
     */
    public static void sample() {
        Observable.interval(300, TimeUnit.MILLISECONDS) //每個0.3秒發送一個事件
                .sample(1, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(TAG + "sample", String.valueOf(aLong));
                    }
                });
    }
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

13)firstElement()/lastElement()

/**
     * ========================firstElement()/lastElement()==========================
     * 
     * 選取第一個元素/最後一個元素
     */
    public static void firstElement() {
        Observable.just(1, 2, 3)
                .firstElement()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG + "firstElement", String.valueOf(integer));
                    }
                });
    }
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符
public static void lastElement() {
	        Observable
	                .just(1, 2, 3)
	                .lastElement()
	                .subscribe(new Consumer<Integer>() {
	                    @Override
	                    public void accept(Integer integer) throws Exception {
	                        Log.d(TAG + "lastElement", String.valueOf(integer));
	                    }
	                });
	    }
           
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符

6、條件 / 布爾操作符:通過設定函數,判斷被觀察者(Observable)發送的事件是否符合條件

Rxjava操作符一、RxJava的基礎使用二、RxJava操作符
Rxjava操作符一、RxJava的基礎使用二、RxJava操作符