天天看点

Rxjava操作符一、RxJava的基础使用二、RxJava操作符

目录

  • 一、RxJava的基础使用
    • 1. 定义
    • 2. 作用
    • 3. 特点
    • 4.Rxjava原理介绍
    • 5. 基本使用
      • 5.1.1 使用步骤
      • 5.1.2 步骤详解
  • 二、RxJava操作符
    • 1、创建操作符 :创建被观察者(Observable)对象&发送事件
      • 1)Create() 操作符
      • 2)Just() 操作符
      • 3)fromIterable() 操作符
      • 4)timer()操作符
      • 5)fromArray() 操作符
      • 6)interval() 定时器
      • 7)intervalRange() 操作符
      • 8)Range() 操作符
    • 2、转换操作符:变换被观察者(Observable)发送的事件。将Observable发送的数据按照一定的规则做一些变换,然后再将变换的数据发射出去。变换的操作符有map,flatMap,concatMap,switchMap,buffer,groupBy等等。
      • 1)map()操作符
      • 2)flatMap()操作符
      • 3)concatMap() 操作符
      • 4)buffer() 操作符
    • 3、 合并操作符:组合多个被观察者(Observable)&合并需要发送的事件。包含:concatMap(),concat(),merge(),mergeArray(),concateArray(),reduce(),collect(),startWith(),zip(),count()等
      • 1)merge(),concat ()操作符
      • 2)concatDelayError()/mergeDelayError() 操作符
      • 3)zip 操作符
      • 4)combineLatest 操作符
      • 5)reduce ()操作符
      • 6)collect() 操作符
      • 7)startWith()/startWithArray() 操作符
      • 8)count() 操作符
    • 4、功能操作符:辅助被观察者(Observable) 发送事件时实现一些功能性需求,如错误处理,线程调度。
      • 1) subscribe() 操作符
      • 2) delay() 操作符
      • 3) do 系列操作符
      • 4) onErrorReturn() 操作符
      • 5) onExceptionResumeNext()/onErrorResumeNext() 操作符
      • 6) retry() 操作符
      • 7) retryUntil() 操作符
      • 8) retryWhen() 操作符
      • 9) repeat() 操作符
      • 10)repeatWhen() 操作符
      • 11)debounce() 操作符
      • 12)subscribeOn / ObserverOn 操作符
    • 5、过滤操作符:用于将Observable发送的数据进行过滤和选择。让Observable返回我们所需要的数据。过滤操作符有buffer(),filter(),skip(),take(),skipLast(),takeLast(),throttleFirst(),distainctUntilChange()。
      • 1)filter() 操作符
      • 2)distinct() 操作符
      • 3)distinctUntilChanged() 操作符
      • 4)skip(),skipLast()操作符
      • 5)take() 操作符
      • 6)takeLast() 操作符
      • 7)elementAt() 操作符
      • 8)elementAtOrError()操作符
      • 9)ignoreElements() 操作符
      • 10)ofType() 操作符
      • 11)throttleFirst()/throttleLast() 操作符
      • 12)sample()
      • 13)firstElement()/lastElement()
    • 6、条件 / 布尔操作符:通过设置函数,判断被观察者(Observable)发送的事件是否符合条件

一、RxJava的基础使用

1. 定义

RxJava 在 GitHub 的介绍:

RxJava:a library for composing asynchronous and event-based programs using observable sequences for the Java VM
// 翻译:RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库
           

总结:RxJava 是一个 基于事件流、实现异步操作的库

2. 作用

实现异步操作

类似于 Android中的 AsyncTask 、Handler作用

3. 特点

由于 RxJava的使用方式是:基于事件流的链式调用,所以使得 RxJava:

  1. 逻辑简洁
  2. 实现优雅
  3. 使用简单

    更重要的是,随着程序逻辑的复杂性提高,它依然能够保持简洁 & 优雅

4.Rxjava原理介绍

  • Rxjava原理 基于 一种扩展的观察者模式
  • Rxjava的扩展观察者模式中有4个角色:
角色 作用 类比
被观察者(Observable) 产生事件 顾客
观察者(Observer) 接收事件,并给出响应动作 厨房
订阅(Subscribe) 连接 被观察者 & 观察者 服务员
事件(Event) 被观察者 & 观察者 沟通的载体 菜式

具体原理

结合 顾客到饭店吃饭 的生活例子理解:

Rxjava操作符一、RxJava的基础使用二、RxJava操作符
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

即RxJava原理可总结为:被观察者 (Observable) 通过 订阅(Subscribe) 按顺序发送事件 给观察者 (Observer), 观察者(Observer) 按顺序接收事件 & 作出对应的响应动作。具体如下图:

Rxjava操作符一、RxJava的基础使用二、RxJava操作符

5. 基本使用

5.1.1 使用步骤

Rxjava操作符一、RxJava的基础使用二、RxJava操作符

5.1.2 步骤详解

加入依赖:

compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
           

具体实现:

public class MainActivity extends AppCompatActivity {
		    private static final String TAG = "Rxjava";

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);		

	// 步骤1:创建被观察者 Observable & 生产事件
	// 即 顾客入饭店 - 坐下餐桌 - 点菜
    
    //  1. 创建被观察者 Observable 对象
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        // 2. 在复写的subscribe()里定义需要发送的事件
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // 通过 ObservableEmitter类对象产生事件并通知观察者
            // ObservableEmitter类介绍
                // a. 定义:事件发射器
                // b. 作用:定义需要发送的事件 & 向观察者发送事件
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();
        }
    });

	// 步骤2:创建观察者 Observer 并 定义响应事件行为
	// 即 开厨房 - 确定对应菜式
    
    Observer<Integer> observer = new Observer<Integer>() {
        // 通过复写对应方法来 响应 被观察者
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "开始采用subscribe连接");
        }
        // 默认最先调用复写的 onSubscribe()

        @Override
        public void onNext(Integer value) {
            Log.d(TAG, "对Next事件"+ value +"作出响应"  );
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "对Error事件作出响应");
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "对Complete事件作出响应");
        }
    };

    
    // 步骤3:通过订阅(subscribe)连接观察者和被观察者
    // 即 顾客找到服务员 - 点菜 - 服务员下单到厨房 - 厨房烹调
    observable.subscribe(observer);
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

基于事件流的链式调用方式:

public class MainActivity extends AppCompatActivity {

private static final String TAG = "Rxjava";

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);

// RxJava的流式操作
        Observable.create(new ObservableOnSubscribe<Integer>() {
        // 1. 创建被观察者 & 生产事件
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            // 2. 通过通过订阅(subscribe)连接观察者和被观察者
            // 3. 创建观察者 & 定义响应事件的行为
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe连接");
            }
            // 默认最先调用复写的 onSubscribe()

        @Override
        public void onNext(Integer value) {
            Log.d(TAG, "对Next事件"+ value +"作出响应"  );
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "对Error事件作出响应");
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "对Complete事件作出响应");
        }

    });
}
           

}

二、RxJava操作符

1、创建操作符 :创建被观察者(Observable)对象&发送事件

Rxjava操作符一、RxJava的基础使用二、RxJava操作符

1)Create() 操作符

/**
        *
        *  =======================基本用法Create====================================
        *
        *  Observable 被观察者
        *
        *  ObservableOnSubscribe  观察者与被观察者的桥接(事件发射器)
        *
        *  ObServer 观察者
        *
        *   被观察者  -->  观察者与被观察者的桥接  --> 观察者
        *
        *   被观察者.create(观察者与被观察者的桥接).subscribe(观察者)
        *
        */

    Observable
            .create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    for (int i = 0; i < 10; i++) {
                        emitter.onNext(String.valueOf(i));
                    }
                    emitter.onComplete();
                }
            })
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String s) {
                    Log.d(TAG, s);
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "complete");
                }
            });
}
           

输出如下:

Rxjava操作符一、RxJava的基础使用二、RxJava操作符

2)Just() 操作符

/**
     * ====================just 操作符 ====================
     * 
     * 此操作符的作用是将传入的数据依次发送出去.最多可以传10个参数
     * 
     * 以下代码会依次把 1-10的字符串发送出去。执行10此观察者的onNext方法,最后默认执行onComplete方法
     */
    public static void just() {

    Observable
            .just("1", "2", "3", "4", "5",
                    "6", "7", "8", "9", "10")
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String s) {
                    Log.d(TAG + "just", s);
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "just", "complete");
                }
            });
}
           

输出如下:

Rxjava操作符一、RxJava的基础使用二、RxJava操作符

3)fromIterable() 操作符

/**
 * ====================fromIterable 操作符 ====================
 * 
 * 此操作符的作用是将传入的数组集合按脚标依次发送出去
 * 
 * 以下代码会依次把 0-9的字符串发送出去。执行10此观察者的onNext方法,最后默认执行onComplete方法
 */

public static void fromIterable() {

    List<String> list = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        list.add(String.valueOf(i));
    }

    Observable
            .fromIterable(list)
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String s) {
                    Log.d(TAG + "fromIterable", s);
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "fromIterable", "complete");
                }
            });
}
           

输出如下:

Rxjava操作符一、RxJava的基础使用二、RxJava操作符

4)timer()操作符

/**
     * ==========================timer操作符 ==============================
     * 
     * 延迟指定时间发送一个0数值(Long类型)
     * 
     * timer操作符主要运行在一个新线程中,也可以自定义线程调度器(第三个参数)
     */
    public static void timer() {

    Observable
            .timer(2, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.d(TAG + "timer", String.valueOf(aLong));
                }
            });

}
           

输出如下:

Rxjava操作符一、RxJava的基础使用二、RxJava操作符

5)fromArray() 操作符

/**
     * ====================fromArray 操作符============================
     * 
     * 对一个数组集合进行观察,把数组一次性发给观察者,只会执行一次观察者的onNext,最后默认执行onComplete方法
     */
    public static void fromArray() {

    List<String> list = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        list.add(String.valueOf(i));
    }
    Observable
            .fromArray(list)
            .subscribe(new Observer<List<String>>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(List<String> strings) {
                    Log.d(TAG + "fromArray", strings.toString());
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "fromArray", "complete");
                }
            });
}
           

输出如下:

Rxjava操作符一、RxJava的基础使用二、RxJava操作符

6)interval() 定时器

/**
     * ====================interval  定时器====================
     * 
     * 这个相当于定时器,用它可以取代CountDownTimer。它会按照设定的间隔时间,每次发送一个事件,发送的事件序列:默认从0开始,无限递增的整数序列
     * 
     * 以下代码输出:   0 ----(5秒后)-----1-----(5秒后)------2---------(5秒后)--------3-------(5秒后)-----.......
     */
    public static void interval() {

    Observable
            .interval(5, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.d(TAG + "interval", String.valueOf(aLong));//从0开始输出
                }
            });
}
           

输出如下:

Rxjava操作符一、RxJava的基础使用二、RxJava操作符

7)intervalRange() 操作符

/**
     * intervalRange  操作符
     * <p>
     * 作用和interval相同,但可以指定发送数据的数量
     */
    public static void intervalRange() {


    /**
     *  参数1: 起始发送值
     *  参数2:发送数量
     *  参数3:首次发送延迟事件
     *  参数4:每次发送事件间隔
     *  参数5:时间单位
     *
     */
    Observable
            .intervalRange(2, 10, 3, 1, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.d(TAG + "intervalRan", String.valueOf(aLong));//从2开始输出
                }
            });
}
           

输出如下:

Rxjava操作符一、RxJava的基础使用二、RxJava操作符

8)Range() 操作符

/**
     * Range  操作符
     * <p>
     * 作用发送指定范围的序列,可指定范围.作用类似intervalRange,但不同的是range是无延迟发送
     */
    public static void range() {

    Observable
            .range(2, 6)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG + "interval", String.valueOf(integer));//从2开始输出
                }
            });
}
           

输出如下:

Rxjava操作符一、RxJava的基础使用二、RxJava操作符

2、转换操作符:变换被观察者(Observable)发送的事件。将Observable发送的数据按照一定的规则做一些变换,然后再将变换的数据发射出去。变换的操作符有map,flatMap,concatMap,switchMap,buffer,groupBy等等。

Rxjava操作符一、RxJava的基础使用二、RxJava操作符

1)map()操作符

/**
     * ======================map============================
     * 
     * map操作符,可以说是的被观察者转换器。 通过指定一个Funcation对象,将被观察者(Observable)转换成新的被观察者(Observable)对象并发射,观察者会收到新的被观察者并处理
     * 
     * 
     * 本来发射的数据是 数字1,然后观察者接收到的是 “ 这是新的观察数据===: 1”
     * 
     * 流程:  被观察者.create(事件发射器).map(转换器).subscribe(观察者)
     */

public static void map() {
    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; i < 10; i++) {
                        emitter.onNext(i);
                    }
                    emitter.onComplete();
                }
            })
            .map(new Function<Integer, String>() {
                @Override
                public String apply(Integer integer) throws Exception {
                    return "这是新的观察数据===:" + integer;

                }
            })
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d(TAG + "map", s);
                }
            });
}
           

输出如下:

Rxjava操作符一、RxJava的基础使用二、RxJava操作符

2)flatMap()操作符

/**
     * ======================flatMap============================
     * 
     * flatMap操作符, 将Observable每一次发射的事件都转换成一个Observable,也就是说把Observable的发射事件集合转换成Observable集合。
     * 然后观察者Observer最终观察的是Observable集合。但是观察者不能保证接收到这Observable集合发送事件的顺序。
     * 
     * 是不是很抽象? 先来看看这一个流程:  观察者.create(事件发射器).flatMap(转换器).subscribe(观察者)
     * 
     * 再来看看例子 : 下面的代码,一开始Observable通过发射器的onNext发送了0-9这10个事件发送出去,正常来说Observer接收到就是 0 - 9 这10个数据
     * 然而中间经过了flatMap的转换。这 10个事件都分别在Funcation转换器上新的Observable。而最终观察者接收的就是这10个新的Observable的发送事件。
     */

public static void flatMap() {

    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; i < 10; i++) {
                        emitter.onNext(i);
                    }
                    emitter.onComplete();
                }
            })
            .flatMap(new Function<Integer, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    List<String> list = new ArrayList<>();
                    list.add(String.valueOf(0));
                    list.add(String.valueOf(1));
                    return Observable.fromIterable(list).delay(100, TimeUnit.MILLISECONDS);
                }
            })
            .subscribe(
                    new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {

                        }

                        @Override
                        public void onNext(String s) {
                            Log.d(TAG + "flatMap", s);
                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        @Override
                        public void onComplete() {
                            Log.d(TAG + "flatMap", "complete");
                        }
                    });
}
           

输出如下:

Rxjava操作符一、RxJava的基础使用二、RxJava操作符

3)concatMap() 操作符

/**
     * ======================concatMap============================
     * 
     * 与上面的flatMap作用基本一样,与flatMap唯一不同的是concat能保证Observer接收到Observable集合发送事件的顺序
     */
    public static void concatMap() {

    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; i < 10; i++) {
                        emitter.onNext(i);
                    }
                    emitter.onComplete();
                }
            })
            .concatMap(new Function<Integer, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    List<String> list = new ArrayList<>();
                    list.add(String.valueOf(0));
                    list.add(String.valueOf(1));
                    return Observable.fromIterable(list).delay(100, TimeUnit.MILLISECONDS);
                }
            })
            .subscribe(
                    new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {

                        }

                        @Override
                        public void onNext(String s) {
                            Log.d(TAG + "flatMap", s);
                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        @Override
                        public void onComplete() {
                            Log.d(TAG + "flatMap", "complete");
                        }
                    });
}
           

输出如下:

Rxjava操作符一、RxJava的基础使用二、RxJava操作符

4)buffer() 操作符

/**
 * ========================buffer 操作符 ======================================
 * 
 * 把发射数据按照一定间隔分成若干段。按每段的数据转换成新的Observable,这个Observable把一段数据一次性发射出去。
 * 可以简单地理解为把一组数据分成若干小组发射出去,而不是单个单个地发射出去
 */
public static void buffer() {

    Observable
            .just(1, 2, 3, 4, 5, 6)
            .buffer(2)
            .subscribe(new Observer<List<Integer>>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(List<Integer> integers) {
                    for (Integer integer : integers) {
                        Log.d(TAG + "buffer", String.valueOf(integer));
                    }
                    Log.d(TAG + "buffer", "============================");
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "buffer", "onComplete");
                }
            });
}
           

输出如下:

Rxjava操作符一、RxJava的基础使用二、RxJava操作符

3、 合并操作符:组合多个被观察者(Observable)&合并需要发送的事件。包含:concatMap(),concat(),merge(),mergeArray(),concateArray(),reduce(),collect(),startWith(),zip(),count()等

Rxjava操作符一、RxJava的基础使用二、RxJava操作符

1)merge(),concat ()操作符

/**
     * ========================merge,concat 操作符 ======================================
     * 
     * merge操作符是把多个Observable合并成一个进行发射。merge可能会让合并到Observable的数据顺序发生错乱(组合被观察者数量<=4个)(并行无序)
     * mergeArray操作符和merge作用一样,但不同的是组合被观察者数量>4个)(并行无序)
     * 
     * concat操作符也是把多个Observable合并成一个进行发射。但concat则保证合并的每个Observable的事件按顺序发射出去。(组合被观察者数量<=4个)(串行有序)
     * concatArray操作符和concat作用一样,但不同的是组合被观察者数量>4个)(串行有序)
     */
    public static void merge() {
        Observable observable1 = Observable.just(1, 2, 3);
        Observable observable2 = Observable.just("哈哈", "嘻嘻", "啊啊");

    Observable
            .merge(observable1, observable2).delay(1, TimeUnit.SECONDS)
            .subscribe(new Observer() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Object o) {
                    Log.d(TAG + "merge", o.toString());
                }

                @Override
                public void onError(Throwable e) {
                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "merge", "onComplete");
                }
            });
}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

2)concatDelayError()/mergeDelayError() 操作符

/**
     * ========================concatDelayError()/mergeDelayError() 操作符 ======================================
     * 
     * 这两个操作符的作用是: 使用concat()和merge()操作符时,若其中一个被观察者发送onError事件,则会马上终止其它被观察者继续发送事件。所以呐,这时使用concatError()/
     * mergeDelayError()事件可以使onError事件推迟到其它被观察者发送事件结束后在再触发
     */
    public static void concatDelayError() {

    Observable
            .concatArrayDelayError(Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onError(new NullPointerException());
                    emitter.onNext(3);
                    emitter.onNext(4);
                }
            }), Observable.just(5, 6))


            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG + "cDelayError", String.valueOf(integer));
                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG + "cDelayError", "onError");
                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "cDelayError", "onComplete");
                }
            });
}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

3)zip 操作符

/**
     * ========================zip 操作符 ======================================
     * 
     * 把多个Observable合并后,并且把这些Observable的数据进行转换再发射出去。转换之后的数据数目由最短数据长度的那个Observable决定。发射完最终会自动调用观察者的onComplete方法()
     * 
     * 如以下代码: 数据长度为4的observable1和数据长度为3的observable2进行合并转换后,观察者只接收到3个数据
     */

public static void zip() {

    Observable observable1 = Observable.just(1, 2, 3, 4);
    Observable observable2 = Observable.just("哈哈", "嘻嘻", "啊啊");


    Observable
            .zip(observable1, observable2, new BiFunction<Integer, String, String>() {

                @Override
                public String apply(Integer integer, String s) throws Exception {
                    return s + integer;
                }
            })
            .subscribe(new Observer() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Object o) {
                    Log.d(TAG + "zip", o.toString());
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "merge", "onComplete");
                }
            });
}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

4)combineLatest 操作符

/**
     * ========================combineLatest 操作符 ======================================
     * 
     * 当两个Observable 中的任何一个发送了数据,将先发送了数据的Observable的最新(最后)一个数据和另一个Observable发送的每个数据结合,最终基于该结合的结果发送数据
     * 
     * 与zip()的区别: zip()是按个数合并,即1对1合并;而combineLatest()是基于时间合并,,即在同一时间点上合并
     */

/**
     *
     *  ======================combineLatestDelayError =================================
     *
     *  作用类似于concatDelayError() / mergeDelayError(),用于错误处理

public static void combineLatest() {

    Observable
            .combineLatest(Observable.just(1, 2, 3)
                    , Observable.intervalRange(1, 4, 2, 1, TimeUnit.SECONDS)
                    , new BiFunction<Integer, Long, String>() {
                        @Override
                        public String apply(Integer integer, Long aLong) throws Exception {
                            return "合并后的数据为:" + integer + aLong;
                        }
                    })
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String s) {
                    Log.d(TAG + "combineLatest", s);
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "combineLatest", "onComplete");
                }
            });
}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

5)reduce ()操作符

/**
 * ======================reduce  操作符=================================
 * 
 * 把被观察者需要发送的数据按照指定规则聚合成一个数据发送
 * 
 * 聚合的规则需要我们编写,内部流程是前两个数据按照我们的规则合并后,再与后面的数据按规则合并,依次类推。这样说有点抽象,看下面的例子。
 */
public static void reduce() {

    Observable
            .just(1, 2, 3, 4, 5)
            .reduce(new BiFunction<Integer, Integer, Integer>() {
                @Override
                public Integer apply(Integer integer, Integer integer2) throws Exception {
                    Log.d(TAG + "reduce", "本次合并的过程是:  " + integer + "+" + integer2);
                    return integer + integer2;
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG + "reduce", "最终计算的结果是 :  " + integer);
                }
            });
}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

6)collect() 操作符

/**
 * ========================collect 操作符=================================
 * 
 * 作用是把 Observable(被观察者)发送的事件收集到一个数据结构中
 */
public static void collect() {

    Observable
            .just(1, 2, 3, 4, 5)
            .collect(new Callable<ArrayList<Integer>>() {
                @Override
                public ArrayList<Integer> call() throws Exception {
                    return new ArrayList<>();
                }
            }, new BiConsumer<ArrayList<Integer>, Integer>() {
                @Override
                public void accept(ArrayList<Integer> integers, Integer integer) throws Exception {
                    integers.add(integer);
                }
            })
            .subscribe(new Consumer<ArrayList<Integer>>() {
                @Override
                public void accept(ArrayList<Integer> integers) throws Exception {
                    Log.d(TAG + "collect", integers.toString());
                }
            });
}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

7)startWith()/startWithArray() 操作符

/**
    * ========================startWith/startWithArray 操作符=================================
    * 
    * 在一个被观察者发送时间前,追加发送一些数据/一个新的被观察者
    */
    public static void startWith() {

    Observable.just(7, 8, 9)
            .startWith(6)   //在发送序列去追加单个数据
            .startWithArray(4, 5)  //在发送序列去追加多个数据
            .startWith(Observable.just(1, 2, 3))  //在发送序列去追加单个被观察者
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG + "startWith", String.valueOf(integer));
                }
            });
}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

8)count() 操作符

/**
    * ========================count 操作符=================================
    * 
    * 统计被观察者发送事件数量
    */
    public static void count() {
        Observable
                .just(1, 2, 3, 4)
                .count()
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(TAG + "count", "发送事件的数量 : " + aLong);
                    }
                });
    }
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

4、功能操作符:辅助被观察者(Observable) 发送事件时实现一些功能性需求,如错误处理,线程调度。

Rxjava操作符一、RxJava的基础使用二、RxJava操作符

1) subscribe() 操作符

/**
     * ==================subscribe 操作符===========================
     *
     *  连接被观察者和观察者
     */
    public static void subscribe() {

    //创建被观察者
    Observable observable = Observable.create(new ObservableOnSubscribe() {
        @Override
        public void subscribe(ObservableEmitter emitter) throws Exception {
            emitter.onNext("事件");
        }
    });

    //创建观察者
    Observer observer = new Observer() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG + "subscribe", "开始连接");
        }

        @Override
        public void onNext(Object o) {
            Log.d(TAG + "subscribe", "收到事件");
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    };

    //通过subscribe 进行 被观察者(Observable)与观察者(Observer)的连接
    observable.subscribe(observer);

}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

2) delay() 操作符

/**
     * ==================delay 操作符=======================================
     * 
     * 延迟发送事件
     * 
     * delay有多个重载方法:
     * 
     * delay(long delay,TimeUnit unit) :指定延迟时间。 参数一:时间 ; 参数二:时间单位
     * 
     * delay(long delay, TimeUnit unit, Scheduler scheduler)  指定延迟时间&线程调度器。参数一:时间 ; 参数二:时间单位;参数三: 线程调度器
     * 
     * delay(long delay, TimeUnit unit, boolean delayError)  指定延迟时间&线程调度器。参数一:时间 ; 参数二:时间单位;参数三: 是否错误延迟
     * 
     * delay(long delay, TimeUnit unit, Scheduler scheduler, boolean delayError)  指定延迟时间&线程调度器&错误延迟。参数一:时间 ; 参数二:时间单位;
     * 参数三: 线程调度器; 参数四:是否错误延迟(若中间发生错误,是否如常执行,执行完在执行onError())
     */
    public static void delay() {

    Observable
            .just(1, 2)
            .delay(10, TimeUnit.SECONDS)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG + "delay", String.valueOf(integer));
                }
            });
}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

3) do 系列操作符

/**
     * ========================do 系列操作符 =======================================
     * 
     * 在事件发送&接收的整个周期过程中进行操作。
     * 
     * 如发送事件前的操作,发送事件后的回调请求
     * 
     * do系列操作符包含以下:
     * 
     * doOnEach() :当Observable每发送一次事件就会调用一次(包含onNext(),onError(),onComplete())
     * doOnNext(): 执行 onNext()前调用
     * doAfterNext(): 执行onNext()后调用
     * doOnComplete():执行onComplete()前调用
     * doOnError():执行 onError()前调用
     * doOnTerminate(): 执行终止(无论正常发送完毕/异常终止)
     * doFinally(): 最后执行
     * doOnSubscribe() :观察者订阅是调用
     * doOnUnScbscribe(): 观察者取消订阅时调用
     */

public static void dos() {
    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onError(new NullPointerException());
                }
            })
            .doOnEach(new Consumer<Notification<Integer>>() {
                @Override
                public void accept(Notification<Integer> integerNotification) throws Exception {
                    Log.d(TAG + "doOnEach", "doOnEach:  " + String.valueOf(integerNotification.getValue()));
                }
            })
            .doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG + "doOnNext", "doOnNext:  " + String.valueOf(integer));
                }
            })
            .doAfterNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG + "doAfterNext", "doAfterNext:  " + String.valueOf(integer));
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    Log.d(TAG + "doOnComplete", "doOnComplete");
                }
            })
            .doOnError(new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Log.d(TAG + "doOnError", "doOnError");
                }
            })
            .doOnTerminate(new Action() {
                @Override
                public void run() throws Exception {
                    Log.d(TAG + "doOnTerminate", "doOnTerminate");
                }
            })
            .doAfterTerminate(new Action() {
                @Override
                public void run() throws Exception {
                    Log.d(TAG + "doAfterTermi", "doAfterTerminate");
                }
            })
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(Disposable disposable) throws Exception {
                    Log.d(TAG + "doOnSubscribe", "doOnSubscribe");
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    Log.d(TAG + "doFinally", "doFinally");
                }
            })
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "onSubscribe");
                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "收到的数据:  " + integer);
                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError");
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });

}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

4) onErrorReturn() 操作符

/** ====================onErrorReturn() 操作符 ======================
     * 
     * 可以捕获错误。遇到错误时,发送一个特殊事件,并且正常终止.注意后面的事件不会再发送
     */
    public static void onErrorReturn() {

    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onError(new Throwable("Throwable"));
                    emitter.onNext(3);

                }
            })
            .onErrorReturn(new Function<Throwable, Integer>() {
                @Override
                public Integer apply(Throwable throwable) throws Exception {
                    Log.e(TAG, "发生了错误:  " + throwable.getMessage());
                    return 404;
                }
            })
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, String.valueOf(integer));
                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, e.getMessage());
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });

}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

5) onExceptionResumeNext()/onErrorResumeNext() 操作符

/**
 * ====================onExceptionResumeNext()/onErrorResumeNext() 操作符 ======================
 * 
 * 遇到错误时发送一个新的Observable 。并且正常终止.注意原Observable后面的事件不会再发送
 * 
 * 如果捕获Exception的话使用onExceptionResumeNext() ,捕获错误的用onErrorResumeNext()
 */
public static void onExceptionResumeNext() {
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onError(new NullPointerException("NullPointerException"));
            emitter.onNext(3);
        }
    }).onExceptionResumeNext(new Observable<Integer>() {
        @Override
        protected void subscribeActual(Observer<? super Integer> observer) {
            observer.onNext(4);
            observer.onNext(5);

        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, String.valueOf(integer));
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete");
        }
    });
}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

6) retry() 操作符

/**
 * ====================retry() 操作符 ======================
 * 
 * 作用是:出现错误时,让被观察者重新发送数据
 * 注:若发送错误,则一直重新发送
 * 
 * 有几个重载方法:
 * retry() : 出现错误时,让被观察者重新发送数据。若错误一直发生,则一直重新发送
 * 
 * retry(long time):与retry不同的书,若错误一直发生,被观察者则一直重新发送数据,但这持续重新发送有次数限制
 * 
 * retry(Predicate predicate) :  出现错误时,根据指定逻辑(可以捕获到发生的错误)决定是否让被观察者重新发送数据
 * 
 * retry(new BiPredicate<Integer, Throwable>):出现错误时,根据指定逻辑(可以捕获重发的次数和发生的错误)决定是否让被观察者重新发送数据
 * 
 * retry(long time,Predicate predicate) : 出现错误时,根据指定逻辑(可以捕获到发生的错误)决定是否让被观察者重新发送数据。并且有持续重发的次数限制
 */
public static void retry() {

    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onError(new Throwable("发生错误了"));
                    emitter.onNext(3);
                }
            })
            .retry(new BiPredicate<Integer, Throwable>() {
                @Override
                public boolean test(Integer integer, Throwable throwable) throws Exception {

                    // interger 为重试次数 ,throwable 为捕获到的异常

                    Log.e(TAG + "retry", throwable.getMessage());
                    Log.e(TAG + "integer", "重试次数: " + integer);

                    //return true : 重新发送请求(若持续遇到错误,就持续重新发送)
                    //return false :    不重新发送数据 并且调用观察者的onError()方法结束

                    if (integer > 2)
                        return false;
                    return true;
                }
            })
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Integer integer) {
                    Log.e(TAG + "retry", String.valueOf(integer));
                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError");
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

7) retryUntil() 操作符

/**
     * ===================retryUntil() 操作符============================
     * 
     * 发送事件遇到错误,指定规则是否重新发送。retry(Predicate predicate)。
     * 
     *  return true : 不重新发送请求,并且调用观察者的onError()方法结束
     *  return false : 重新发送数据(若持续遇到错误,就持续重新发送)
     */
    public static void retryUntil() {

    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onError(new Throwable("发生错误了"));
                    emitter.onNext(3);
                }
            })
            .retryUntil(new BooleanSupplier() {
                @Override
                public boolean getAsBoolean() throws Exception {

                     //return true : 不重新发送请求,并且调用观察者的onError()方法结束
                    // return false : 重新发送数据(若持续遇到错误,就持续重新发送)
                    return false;
                }
            })
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Integer integer) {
                    Log.e(TAG + "retryUntil", String.valueOf(integer));
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "retryUntil", "onComplete");
                }
            });
}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

8) retryWhen() 操作符

/**
     * ===================retryWhen() 操作符============================
     * 
     * 遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并决定是否需要重新订阅原始被观察者(Observable) &  发送事件
     */
    public static void retryWhen() {
        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onError(new Throwable("发送了错误"));
                        emitter.onNext(3);
                    }
                })
                //遇到Error时会回调
                .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                        return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                            @Override
                            public ObservableSource<?> apply(Throwable throwable) throws Exception {

                            //1、若返回的Observable发送的事件 = Error ,则原始的Observable则不重新发送事件。该异常信息可在观察者的onError中获得
                            //return Observable.error(throwable);

                            //2、若返回的Observable发送的事件= Next事件(和next的内容无关),则原始的Observable重新发送事件(若持续遇到错误,则持续发送)
                            return Observable.just(5); //仅仅是作为一个触发重新订阅原被观察者的通知,什么数据并不重要,只有不是onComplete/onError事件
                        }
                    });

                }
            })
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG + "retryWhen", String.valueOf(integer));
                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG + "retryWhen", e.getMessage());
                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "retryWhen", "onComplete");
                }
            });
}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

9) repeat() 操作符

/**
     * ===============repeat() 操作符==============
     * 
     * repeat操作符的作用是重复发射 observable的数据序列,可以使无限次也可以是指定次数.不传时为重复无限次
     */
    public static void repeat() {

    Observable
            .just(1, 2, 3)
            .repeat(3)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG + "repeat", String.valueOf(integer));
                }
            });
}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

10)repeatWhen() 操作符

/**
 *  ===============repeatWhen() 操作符==============
 *
 * 将原始 Observable 停止发送事件的标识(Complete() / Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable)
 * ,以此决定是否重新订阅 & 发送原来的 Observable
 */
public static void repeatWhen() {

    Observable
            .just(1, 2, 4)
            .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
                    return  objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(Object o) throws Exception {

                            //若新被观察者(Observable)返回1个Complete()/  Error()事件,则不重新订阅 & 发送原来的 Observable
                            //Observable.empty() = 发送Complete事件,但不会回调观察者的onComplete()
                            return Observable.empty();

                            // return Observable.error(new Throwable("不再重新订阅事件"));
                            // 返回Error事件 = 回调onError()事件,并接收传过去的错误信息。

                            // 情况2:若新被观察者(Observable)返回其余事件,则重新订阅 & 发送原来的 Observable
                            // return Observable.just(1);
                            // 仅仅是作为1个触发重新订阅被观察者的通知,发送的是什么数据并不重要,只要不是Complete() /  Error()事件
                        }
                    });
                }
            })
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Integer integer) {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });
}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

11)debounce() 操作符

/**
     * ===============debounce() 操作符==============
     * <p>
     * 一定的时间内没有操作就会发送事件(只会发送最后一次操作的事件)。
     * <p>
     * 以下的例子: 发送5个事件,每个事件间隔1秒。但是debounce限定了2秒内没有任何操作才会真正发送事件。所以只有最后一次满足条件,只能接收到事件 5
     */
    public static void debounce() {

    Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
            .debounce(2, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.d(TAG + "debounce", String.valueOf(aLong));
                }
            });

}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

12)subscribeOn / ObserverOn 操作符

/**
     * ===============subscribeOn() 操作符==============
     * ===============observerOn() 操作符==============
     * <p>
     * <p>
     * subscribeOn : 发送事件的线程
     * observerOn: 接收事件的线程
     * <p>
     * 线程调度器:
     * Schedulers.io(): 代表io操作的线程,通常用于网络,读写文件等io密集型的操作
     * Schedulers.compucation(): 代表CPU计算密集型的操作,例如需要大量计算的操作
     * Schedulers.newThread(): 代表一个常规的新线程
     * AndroidSchedulers。mainThread(): 代表Android的主线程
     */
    public static void subscribeOn_observerOn() {

    Observable
            .create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("事件");
                    Log.d(TAG + "subscribeOn_ObserverOn", "发送事件:" + Thread.currentThread().getName());
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d(TAG + "subscribeOn_ObserverOn", "接收事件:   " + Thread.currentThread().getName());
                }
            });

}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

5、过滤操作符:用于将Observable发送的数据进行过滤和选择。让Observable返回我们所需要的数据。过滤操作符有buffer(),filter(),skip(),take(),skipLast(),takeLast(),throttleFirst(),distainctUntilChange()。

Rxjava操作符一、RxJava的基础使用二、RxJava操作符

1)filter() 操作符

/**
     * ========================filter() 操作符 ======================================
     * 
     * 对被观察者发送的事件做过滤操作。只有符合筛选条件的事件才会被观察者所接收。
     * 
     * return true : 继续发送
     * 
     * return false : 不发送
     */
    public static void filter() {

    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; i < 3; i++) {
                        emitter.onNext(i);
                    }
                }
            })
            .filter(new Predicate<Integer>() {
                @Override
                public boolean test(Integer integer) throws Exception {
                    //return true : 继续发送
                    //return false : 不发送
                    return integer != 2; // 本例子过滤了 等于2的情况
                }
            })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG + "filter", String.valueOf(integer));
                }
            });

}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

2)distinct() 操作符

/**
     * ========================distinct 操作符 ======================================
     * 
     * 简单地说就是去重。发射的数据包含重复的,会将重复的筛选掉。也就是,它只允许还没有被发射过的数据通过,被观察者接收。发射完数据会自动调用onComplete()方法
     * 
     * y以下代码:观察者只会接收到 : 1,2,3,5 四个数值
     */
    public static void distinct() {
        Observable
                .just(1, 2, 3, 2, 3, 5)
                .distinct()
                .subscribe(
                        new Observer<Integer>() {
                            @Override
                            public void onSubscribe(Disposable d) {

                        }

                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG + "distinct", String.valueOf(integer));
                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        @Override
                        public void onComplete() {
                            Log.d(TAG + "distinct", "onComplete");
                        }
                    });

}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

3)distinctUntilChanged() 操作符

/**
     * ========================distinctUntilChanged 操作符 ======================================
     * 
     * 过滤掉连续重复的事件
     */
    public static void distinctUntilChanged() {
        Observable.just(1, 2, 3, 1, 2, 3, 3, 4, 4)
                .distinctUntilChanged()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, String.valueOf(integer));
                    }
                });
    }
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

4)skip(),skipLast()操作符

/**
 * ========================skip,skipLast======================================
 *
 *  skip 操作符是把Observable发射的数据过滤点掉前n项。而take操作符只能接收前n项。另外还有skipLast和takeLast则是从后往前进行过滤.接收完会调用onComplete()方法
 *
 */

/**
 * 以下代码输出: 3,4,5
 */
public static void skip() {
    Observable
            .just(1, 2, 3, 4, 5, 6, 7)
            .skip(2)  //过滤前2项
            .skipLast(3) //过滤后3项
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG + "skip", "根据顺序过滤" + String.valueOf(integer));
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "skip", "onComplete");
                }
            });
//-------------------------------------------------------------------------------------------------
        Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
                .skip(1, TimeUnit.SECONDS)   //过滤地1s发送的数据
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Long aLong) {
                    Log.d(TAG + "skip", "根据事件过滤" + String.valueOf(aLong));
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });


}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

5)take() 操作符

/**
     * ====================== take 操作符===========================
     * 只能接收两个事件
     */
    public static void take() {
        Observable
                .just(1, 2, 3, 4, 5, 6, 7)
                .take(2)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG + "skip", String.valueOf(integer));
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "skip", "onComplete");
                }
            });
}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

6)takeLast() 操作符

/**
     * =======================takeLast() 操作符 ======================
     * 
     * 只能接收被观察者发送的最后几个事件
     */

public static void takeLast() {
    Observable
            .just(1, 2, 3, 4, 5, 6, 7)
            .takeLast(2)
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG + "skip", String.valueOf(integer));
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    Log.d(TAG + "skip", "onComplete");
                }
            });
}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

7)elementAt() 操作符

/**
     * ========================elementAt() 操作符 ======================================
     * 
     * 只发射第n项数据.
     *  一个参数和两个参数时:
     *  elementAt(第n项)
     * elementAt(第n项,第N项不存在时默认值)
     * 
     *  n为负数时,报IndexOUtOfBoundExection。为正数但超过发射数据长度不会报异常会使用默认值代替
     */

public static void elementAt() {

    Observable.range(1, 5)
            .elementAt(6, 10)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG + "elementAt", String.valueOf(integer));
                }
            });
}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

8)elementAtOrError()操作符

/**
     * ==============elementAtOrError()===================================================
     * 
     * 在elementAtError()的基础上,当出现越界情况(当获取位置的索引>事件序列的长度),即抛出异常
     */
    public static void elementAtOrError() {
        Observable.range(1, 5)
                .elementAtOrError(6)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG + "elementAtOrErr", String.valueOf(integer));
                    }
                });
    }
           

9)ignoreElements() 操作符

/**
     * ========================ignoreElements() 操作符 ======================================
     * 
     * 不管发射的数据.只希望在它完成时和遇到错误时收到通知
     */
    public static void ignoreElements() {

    Observable
            .range(0, 10)
            .ignoreElements()
            .subscribe(new Action() {
                @Override
                public void run() throws Exception {
                    Log.d(TAG + "ignoreEles", "完成了");
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Log.d(TAG + "ignoreEles", "出错了");
                }
            });
}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

10)ofType() 操作符

/**
     * ========================ofType() 操作符 ======================================
     * 
     * 通过数据的类型过滤数据,只发送指定类型数据。
     * 
     * 以下代码观察者只接收到: 1,2
     */
    public static void ofType() {

    Observable.just("哈哈", 1, "嘻嘻", 2, 3.5)
            .ofType(Integer.class)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG + "ofType", String.valueOf(integer));
                }
            });
}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

11)throttleFirst()/throttleLast() 操作符

/**
     * ========================throttleFirst()/throttleLast() 操作符 ======================================
     * 
     * throttleFirst() 在某段时间内,只发送该段事件第一次事件
     * 
     * throttleLast()  在某段时间内,只发送该段事件最后一次事件
     */

public static void throttleFirst() {
    Observable.interval(300, TimeUnit.MILLISECONDS) //每个0.3秒发送一个事件
            .throttleFirst(1, TimeUnit.SECONDS) //只接收每秒内发送的第一个数据
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.d(TAG + "throttleFirst", String.valueOf(aLong));
                }
            });
}
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

12)sample()

/**
     * ===================sample()=================================
     * 
     * 在某段时间内,只发送该段时间内最新(最后)1次事件
     * 
     * 与throttleLast类似
     */
    public static void sample() {
        Observable.interval(300, TimeUnit.MILLISECONDS) //每个0.3秒发送一个事件
                .sample(1, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(TAG + "sample", String.valueOf(aLong));
                    }
                });
    }
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

13)firstElement()/lastElement()

/**
     * ========================firstElement()/lastElement()==========================
     * 
     * 选取第一个元素/最后一个元素
     */
    public static void firstElement() {
        Observable.just(1, 2, 3)
                .firstElement()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG + "firstElement", String.valueOf(integer));
                    }
                });
    }
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符
public static void lastElement() {
	        Observable
	                .just(1, 2, 3)
	                .lastElement()
	                .subscribe(new Consumer<Integer>() {
	                    @Override
	                    public void accept(Integer integer) throws Exception {
	                        Log.d(TAG + "lastElement", String.valueOf(integer));
	                    }
	                });
	    }
           
Rxjava操作符一、RxJava的基础使用二、RxJava操作符

6、条件 / 布尔操作符:通过设置函数,判断被观察者(Observable)发送的事件是否符合条件

Rxjava操作符一、RxJava的基础使用二、RxJava操作符
Rxjava操作符一、RxJava的基础使用二、RxJava操作符