目录
- 一、RxJava的基础使用
-
- 1. 定义
- 2. 作用
- 3. 特点
- 4.Rxjava原理介绍
- 5. 基本使用
-
- 5.1.1 使用步骤
- 5.1.2 步骤详解
- 二、RxJava操作符
-
- 1、创建操作符 :创建被观察者(Observable)对象&发送事件
-
- 1)Create() 操作符
- 2)Just() 操作符
- 3)fromIterable() 操作符
- 4)timer()操作符
- 5)fromArray() 操作符
- 6)interval() 定时器
- 7)intervalRange() 操作符
- 8)Range() 操作符
- 2、转换操作符:变换被观察者(Observable)发送的事件。将Observable发送的数据按照一定的规则做一些变换,然后再将变换的数据发射出去。变换的操作符有map,flatMap,concatMap,switchMap,buffer,groupBy等等。
-
- 1)map()操作符
- 2)flatMap()操作符
- 3)concatMap() 操作符
- 4)buffer() 操作符
- 3、 合并操作符:组合多个被观察者(Observable)&合并需要发送的事件。包含:concatMap(),concat(),merge(),mergeArray(),concateArray(),reduce(),collect(),startWith(),zip(),count()等
-
- 1)merge(),concat ()操作符
- 2)concatDelayError()/mergeDelayError() 操作符
- 3)zip 操作符
- 4)combineLatest 操作符
- 5)reduce ()操作符
- 6)collect() 操作符
- 7)startWith()/startWithArray() 操作符
- 8)count() 操作符
- 4、功能操作符:辅助被观察者(Observable) 发送事件时实现一些功能性需求,如错误处理,线程调度。
-
- 1) subscribe() 操作符
- 2) delay() 操作符
- 3) do 系列操作符
- 4) onErrorReturn() 操作符
- 5) onExceptionResumeNext()/onErrorResumeNext() 操作符
- 6) retry() 操作符
- 7) retryUntil() 操作符
- 8) retryWhen() 操作符
- 9) repeat() 操作符
- 10)repeatWhen() 操作符
- 11)debounce() 操作符
- 12)subscribeOn / ObserverOn 操作符
- 5、过滤操作符:用于将Observable发送的数据进行过滤和选择。让Observable返回我们所需要的数据。过滤操作符有buffer(),filter(),skip(),take(),skipLast(),takeLast(),throttleFirst(),distainctUntilChange()。
-
- 1)filter() 操作符
- 2)distinct() 操作符
- 3)distinctUntilChanged() 操作符
- 4)skip(),skipLast()操作符
- 5)take() 操作符
- 6)takeLast() 操作符
- 7)elementAt() 操作符
- 8)elementAtOrError()操作符
- 9)ignoreElements() 操作符
- 10)ofType() 操作符
- 11)throttleFirst()/throttleLast() 操作符
- 12)sample()
- 13)firstElement()/lastElement()
- 6、条件 / 布尔操作符:通过设置函数,判断被观察者(Observable)发送的事件是否符合条件
一、RxJava的基础使用
1. 定义
RxJava 在 GitHub 的介绍:
RxJava:a library for composing asynchronous and event-based programs using observable sequences for the Java VM
// 翻译:RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库
总结:RxJava 是一个 基于事件流、实现异步操作的库
2. 作用
实现异步操作
类似于 Android中的 AsyncTask 、Handler作用
3. 特点
由于 RxJava的使用方式是:基于事件流的链式调用,所以使得 RxJava:
- 逻辑简洁
- 实现优雅
-
使用简单
更重要的是,随着程序逻辑的复杂性提高,它依然能够保持简洁 & 优雅
4.Rxjava原理介绍
- Rxjava原理 基于 一种扩展的观察者模式
- Rxjava的扩展观察者模式中有4个角色:
角色 | 作用 | 类比 |
---|---|---|
被观察者(Observable) | 产生事件 | 顾客 |
观察者(Observer) | 接收事件,并给出响应动作 | 厨房 |
订阅(Subscribe) | 连接 被观察者 & 观察者 | 服务员 |
事件(Event) | 被观察者 & 观察者 沟通的载体 | 菜式 |
具体原理
结合 顾客到饭店吃饭 的生活例子理解:
即RxJava原理可总结为:被观察者 (Observable) 通过 订阅(Subscribe) 按顺序发送事件 给观察者 (Observer), 观察者(Observer) 按顺序接收事件 & 作出对应的响应动作。具体如下图:
5. 基本使用
5.1.1 使用步骤
5.1.2 步骤详解
加入依赖:
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
具体实现:
public class MainActivity extends AppCompatActivity {
private static final String TAG = "Rxjava";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
// 步骤1:创建被观察者 Observable & 生产事件
// 即 顾客入饭店 - 坐下餐桌 - 点菜
// 1. 创建被观察者 Observable 对象
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
// 2. 在复写的subscribe()里定义需要发送的事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// 通过 ObservableEmitter类对象产生事件并通知观察者
// ObservableEmitter类介绍
// a. 定义:事件发射器
// b. 作用:定义需要发送的事件 & 向观察者发送事件
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
// 步骤2:创建观察者 Observer 并 定义响应事件行为
// 即 开厨房 - 确定对应菜式
Observer<Integer> observer = new Observer<Integer>() {
// 通过复写对应方法来 响应 被观察者
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "对Next事件"+ value +"作出响应" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
};
// 步骤3:通过订阅(subscribe)连接观察者和被观察者
// 即 顾客找到服务员 - 点菜 - 服务员下单到厨房 - 厨房烹调
observable.subscribe(observer);
基于事件流的链式调用方式:
public class MainActivity extends AppCompatActivity {
private static final String TAG = "Rxjava";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
// RxJava的流式操作
Observable.create(new ObservableOnSubscribe<Integer>() {
// 1. 创建被观察者 & 生产事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
// 2. 通过通过订阅(subscribe)连接观察者和被观察者
// 3. 创建观察者 & 定义响应事件的行为
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "对Next事件"+ value +"作出响应" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
}
}
二、RxJava操作符
1、创建操作符 :创建被观察者(Observable)对象&发送事件
1)Create() 操作符
/**
*
* =======================基本用法Create====================================
*
* Observable 被观察者
*
* ObservableOnSubscribe 观察者与被观察者的桥接(事件发射器)
*
* ObServer 观察者
*
* 被观察者 --> 观察者与被观察者的桥接 --> 观察者
*
* 被观察者.create(观察者与被观察者的桥接).subscribe(观察者)
*
*/
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
for (int i = 0; i < 10; i++) {
emitter.onNext(String.valueOf(i));
}
emitter.onComplete();
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});
}
输出如下:
2)Just() 操作符
/**
* ====================just 操作符 ====================
*
* 此操作符的作用是将传入的数据依次发送出去.最多可以传10个参数
*
* 以下代码会依次把 1-10的字符串发送出去。执行10此观察者的onNext方法,最后默认执行onComplete方法
*/
public static void just() {
Observable
.just("1", "2", "3", "4", "5",
"6", "7", "8", "9", "10")
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG + "just", s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "just", "complete");
}
});
}
输出如下:
3)fromIterable() 操作符
/**
* ====================fromIterable 操作符 ====================
*
* 此操作符的作用是将传入的数组集合按脚标依次发送出去
*
* 以下代码会依次把 0-9的字符串发送出去。执行10此观察者的onNext方法,最后默认执行onComplete方法
*/
public static void fromIterable() {
List<String> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(String.valueOf(i));
}
Observable
.fromIterable(list)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG + "fromIterable", s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "fromIterable", "complete");
}
});
}
输出如下:
4)timer()操作符
/**
* ==========================timer操作符 ==============================
*
* 延迟指定时间发送一个0数值(Long类型)
*
* timer操作符主要运行在一个新线程中,也可以自定义线程调度器(第三个参数)
*/
public static void timer() {
Observable
.timer(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG + "timer", String.valueOf(aLong));
}
});
}
输出如下:
5)fromArray() 操作符
/**
* ====================fromArray 操作符============================
*
* 对一个数组集合进行观察,把数组一次性发给观察者,只会执行一次观察者的onNext,最后默认执行onComplete方法
*/
public static void fromArray() {
List<String> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(String.valueOf(i));
}
Observable
.fromArray(list)
.subscribe(new Observer<List<String>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<String> strings) {
Log.d(TAG + "fromArray", strings.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "fromArray", "complete");
}
});
}
输出如下:
6)interval() 定时器
/**
* ====================interval 定时器====================
*
* 这个相当于定时器,用它可以取代CountDownTimer。它会按照设定的间隔时间,每次发送一个事件,发送的事件序列:默认从0开始,无限递增的整数序列
*
* 以下代码输出: 0 ----(5秒后)-----1-----(5秒后)------2---------(5秒后)--------3-------(5秒后)-----.......
*/
public static void interval() {
Observable
.interval(5, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG + "interval", String.valueOf(aLong));//从0开始输出
}
});
}
输出如下:
7)intervalRange() 操作符
/**
* intervalRange 操作符
* <p>
* 作用和interval相同,但可以指定发送数据的数量
*/
public static void intervalRange() {
/**
* 参数1: 起始发送值
* 参数2:发送数量
* 参数3:首次发送延迟事件
* 参数4:每次发送事件间隔
* 参数5:时间单位
*
*/
Observable
.intervalRange(2, 10, 3, 1, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG + "intervalRan", String.valueOf(aLong));//从2开始输出
}
});
}
输出如下:
8)Range() 操作符
/**
* Range 操作符
* <p>
* 作用发送指定范围的序列,可指定范围.作用类似intervalRange,但不同的是range是无延迟发送
*/
public static void range() {
Observable
.range(2, 6)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "interval", String.valueOf(integer));//从2开始输出
}
});
}
输出如下:
2、转换操作符:变换被观察者(Observable)发送的事件。将Observable发送的数据按照一定的规则做一些变换,然后再将变换的数据发射出去。变换的操作符有map,flatMap,concatMap,switchMap,buffer,groupBy等等。
1)map()操作符
/**
* ======================map============================
*
* map操作符,可以说是的被观察者转换器。 通过指定一个Funcation对象,将被观察者(Observable)转换成新的被观察者(Observable)对象并发射,观察者会收到新的被观察者并处理
*
*
* 本来发射的数据是 数字1,然后观察者接收到的是 “ 这是新的观察数据===: 1”
*
* 流程: 被观察者.create(事件发射器).map(转换器).subscribe(观察者)
*/
public static void map() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 10; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}
})
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "这是新的观察数据===:" + integer;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG + "map", s);
}
});
}
输出如下:
2)flatMap()操作符
/**
* ======================flatMap============================
*
* flatMap操作符, 将Observable每一次发射的事件都转换成一个Observable,也就是说把Observable的发射事件集合转换成Observable集合。
* 然后观察者Observer最终观察的是Observable集合。但是观察者不能保证接收到这Observable集合发送事件的顺序。
*
* 是不是很抽象? 先来看看这一个流程: 观察者.create(事件发射器).flatMap(转换器).subscribe(观察者)
*
* 再来看看例子 : 下面的代码,一开始Observable通过发射器的onNext发送了0-9这10个事件发送出去,正常来说Observer接收到就是 0 - 9 这10个数据
* 然而中间经过了flatMap的转换。这 10个事件都分别在Funcation转换器上新的Observable。而最终观察者接收的就是这10个新的Observable的发送事件。
*/
public static void flatMap() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 10; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}
})
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<>();
list.add(String.valueOf(0));
list.add(String.valueOf(1));
return Observable.fromIterable(list).delay(100, TimeUnit.MILLISECONDS);
}
})
.subscribe(
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG + "flatMap", s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "flatMap", "complete");
}
});
}
输出如下:
3)concatMap() 操作符
/**
* ======================concatMap============================
*
* 与上面的flatMap作用基本一样,与flatMap唯一不同的是concat能保证Observer接收到Observable集合发送事件的顺序
*/
public static void concatMap() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 10; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}
})
.concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<>();
list.add(String.valueOf(0));
list.add(String.valueOf(1));
return Observable.fromIterable(list).delay(100, TimeUnit.MILLISECONDS);
}
})
.subscribe(
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG + "flatMap", s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "flatMap", "complete");
}
});
}
输出如下:
4)buffer() 操作符
/**
* ========================buffer 操作符 ======================================
*
* 把发射数据按照一定间隔分成若干段。按每段的数据转换成新的Observable,这个Observable把一段数据一次性发射出去。
* 可以简单地理解为把一组数据分成若干小组发射出去,而不是单个单个地发射出去
*/
public static void buffer() {
Observable
.just(1, 2, 3, 4, 5, 6)
.buffer(2)
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<Integer> integers) {
for (Integer integer : integers) {
Log.d(TAG + "buffer", String.valueOf(integer));
}
Log.d(TAG + "buffer", "============================");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "buffer", "onComplete");
}
});
}
输出如下:
3、 合并操作符:组合多个被观察者(Observable)&合并需要发送的事件。包含:concatMap(),concat(),merge(),mergeArray(),concateArray(),reduce(),collect(),startWith(),zip(),count()等
1)merge(),concat ()操作符
/**
* ========================merge,concat 操作符 ======================================
*
* merge操作符是把多个Observable合并成一个进行发射。merge可能会让合并到Observable的数据顺序发生错乱(组合被观察者数量<=4个)(并行无序)
* mergeArray操作符和merge作用一样,但不同的是组合被观察者数量>4个)(并行无序)
*
* concat操作符也是把多个Observable合并成一个进行发射。但concat则保证合并的每个Observable的事件按顺序发射出去。(组合被观察者数量<=4个)(串行有序)
* concatArray操作符和concat作用一样,但不同的是组合被观察者数量>4个)(串行有序)
*/
public static void merge() {
Observable observable1 = Observable.just(1, 2, 3);
Observable observable2 = Observable.just("哈哈", "嘻嘻", "啊啊");
Observable
.merge(observable1, observable2).delay(1, TimeUnit.SECONDS)
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
Log.d(TAG + "merge", o.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "merge", "onComplete");
}
});
}
2)concatDelayError()/mergeDelayError() 操作符
/**
* ========================concatDelayError()/mergeDelayError() 操作符 ======================================
*
* 这两个操作符的作用是: 使用concat()和merge()操作符时,若其中一个被观察者发送onError事件,则会马上终止其它被观察者继续发送事件。所以呐,这时使用concatError()/
* mergeDelayError()事件可以使onError事件推迟到其它被观察者发送事件结束后在再触发
*/
public static void concatDelayError() {
Observable
.concatArrayDelayError(Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new NullPointerException());
emitter.onNext(3);
emitter.onNext(4);
}
}), Observable.just(5, 6))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG + "cDelayError", String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
Log.d(TAG + "cDelayError", "onError");
}
@Override
public void onComplete() {
Log.d(TAG + "cDelayError", "onComplete");
}
});
}
3)zip 操作符
/**
* ========================zip 操作符 ======================================
*
* 把多个Observable合并后,并且把这些Observable的数据进行转换再发射出去。转换之后的数据数目由最短数据长度的那个Observable决定。发射完最终会自动调用观察者的onComplete方法()
*
* 如以下代码: 数据长度为4的observable1和数据长度为3的observable2进行合并转换后,观察者只接收到3个数据
*/
public static void zip() {
Observable observable1 = Observable.just(1, 2, 3, 4);
Observable observable2 = Observable.just("哈哈", "嘻嘻", "啊啊");
Observable
.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return s + integer;
}
})
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
Log.d(TAG + "zip", o.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "merge", "onComplete");
}
});
}
4)combineLatest 操作符
/**
* ========================combineLatest 操作符 ======================================
*
* 当两个Observable 中的任何一个发送了数据,将先发送了数据的Observable的最新(最后)一个数据和另一个Observable发送的每个数据结合,最终基于该结合的结果发送数据
*
* 与zip()的区别: zip()是按个数合并,即1对1合并;而combineLatest()是基于时间合并,,即在同一时间点上合并
*/
/**
*
* ======================combineLatestDelayError =================================
*
* 作用类似于concatDelayError() / mergeDelayError(),用于错误处理
public static void combineLatest() {
Observable
.combineLatest(Observable.just(1, 2, 3)
, Observable.intervalRange(1, 4, 2, 1, TimeUnit.SECONDS)
, new BiFunction<Integer, Long, String>() {
@Override
public String apply(Integer integer, Long aLong) throws Exception {
return "合并后的数据为:" + integer + aLong;
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG + "combineLatest", s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "combineLatest", "onComplete");
}
});
}
5)reduce ()操作符
/**
* ======================reduce 操作符=================================
*
* 把被观察者需要发送的数据按照指定规则聚合成一个数据发送
*
* 聚合的规则需要我们编写,内部流程是前两个数据按照我们的规则合并后,再与后面的数据按规则合并,依次类推。这样说有点抽象,看下面的例子。
*/
public static void reduce() {
Observable
.just(1, 2, 3, 4, 5)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.d(TAG + "reduce", "本次合并的过程是: " + integer + "+" + integer2);
return integer + integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "reduce", "最终计算的结果是 : " + integer);
}
});
}
6)collect() 操作符
/**
* ========================collect 操作符=================================
*
* 作用是把 Observable(被观察者)发送的事件收集到一个数据结构中
*/
public static void collect() {
Observable
.just(1, 2, 3, 4, 5)
.collect(new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList<>();
}
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> integers, Integer integer) throws Exception {
integers.add(integer);
}
})
.subscribe(new Consumer<ArrayList<Integer>>() {
@Override
public void accept(ArrayList<Integer> integers) throws Exception {
Log.d(TAG + "collect", integers.toString());
}
});
}
7)startWith()/startWithArray() 操作符
/**
* ========================startWith/startWithArray 操作符=================================
*
* 在一个被观察者发送时间前,追加发送一些数据/一个新的被观察者
*/
public static void startWith() {
Observable.just(7, 8, 9)
.startWith(6) //在发送序列去追加单个数据
.startWithArray(4, 5) //在发送序列去追加多个数据
.startWith(Observable.just(1, 2, 3)) //在发送序列去追加单个被观察者
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "startWith", String.valueOf(integer));
}
});
}
8)count() 操作符
/**
* ========================count 操作符=================================
*
* 统计被观察者发送事件数量
*/
public static void count() {
Observable
.just(1, 2, 3, 4)
.count()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG + "count", "发送事件的数量 : " + aLong);
}
});
}
4、功能操作符:辅助被观察者(Observable) 发送事件时实现一些功能性需求,如错误处理,线程调度。
1) subscribe() 操作符
/**
* ==================subscribe 操作符===========================
*
* 连接被观察者和观察者
*/
public static void subscribe() {
//创建被观察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext("事件");
}
});
//创建观察者
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG + "subscribe", "开始连接");
}
@Override
public void onNext(Object o) {
Log.d(TAG + "subscribe", "收到事件");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
//通过subscribe 进行 被观察者(Observable)与观察者(Observer)的连接
observable.subscribe(observer);
}
2) delay() 操作符
/**
* ==================delay 操作符=======================================
*
* 延迟发送事件
*
* delay有多个重载方法:
*
* delay(long delay,TimeUnit unit) :指定延迟时间。 参数一:时间 ; 参数二:时间单位
*
* delay(long delay, TimeUnit unit, Scheduler scheduler) 指定延迟时间&线程调度器。参数一:时间 ; 参数二:时间单位;参数三: 线程调度器
*
* delay(long delay, TimeUnit unit, boolean delayError) 指定延迟时间&线程调度器。参数一:时间 ; 参数二:时间单位;参数三: 是否错误延迟
*
* delay(long delay, TimeUnit unit, Scheduler scheduler, boolean delayError) 指定延迟时间&线程调度器&错误延迟。参数一:时间 ; 参数二:时间单位;
* 参数三: 线程调度器; 参数四:是否错误延迟(若中间发生错误,是否如常执行,执行完在执行onError())
*/
public static void delay() {
Observable
.just(1, 2)
.delay(10, TimeUnit.SECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "delay", String.valueOf(integer));
}
});
}
3) do 系列操作符
/**
* ========================do 系列操作符 =======================================
*
* 在事件发送&接收的整个周期过程中进行操作。
*
* 如发送事件前的操作,发送事件后的回调请求
*
* do系列操作符包含以下:
*
* doOnEach() :当Observable每发送一次事件就会调用一次(包含onNext(),onError(),onComplete())
* doOnNext(): 执行 onNext()前调用
* doAfterNext(): 执行onNext()后调用
* doOnComplete():执行onComplete()前调用
* doOnError():执行 onError()前调用
* doOnTerminate(): 执行终止(无论正常发送完毕/异常终止)
* doFinally(): 最后执行
* doOnSubscribe() :观察者订阅是调用
* doOnUnScbscribe(): 观察者取消订阅时调用
*/
public static void dos() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new NullPointerException());
}
})
.doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
Log.d(TAG + "doOnEach", "doOnEach: " + String.valueOf(integerNotification.getValue()));
}
})
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "doOnNext", "doOnNext: " + String.valueOf(integer));
}
})
.doAfterNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "doAfterNext", "doAfterNext: " + String.valueOf(integer));
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG + "doOnComplete", "doOnComplete");
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG + "doOnError", "doOnError");
}
})
.doOnTerminate(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG + "doOnTerminate", "doOnTerminate");
}
})
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG + "doAfterTermi", "doAfterTerminate");
}
})
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG + "doOnSubscribe", "doOnSubscribe");
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG + "doFinally", "doFinally");
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "收到的数据: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
4) onErrorReturn() 操作符
/** ====================onErrorReturn() 操作符 ======================
*
* 可以捕获错误。遇到错误时,发送一个特殊事件,并且正常终止.注意后面的事件不会再发送
*/
public static void onErrorReturn() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Throwable("Throwable"));
emitter.onNext(3);
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
Log.e(TAG, "发生了错误: " + throwable.getMessage());
return 404;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
Log.d(TAG, e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
5) onExceptionResumeNext()/onErrorResumeNext() 操作符
/**
* ====================onExceptionResumeNext()/onErrorResumeNext() 操作符 ======================
*
* 遇到错误时发送一个新的Observable 。并且正常终止.注意原Observable后面的事件不会再发送
*
* 如果捕获Exception的话使用onExceptionResumeNext() ,捕获错误的用onErrorResumeNext()
*/
public static void onExceptionResumeNext() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new NullPointerException("NullPointerException"));
emitter.onNext(3);
}
}).onExceptionResumeNext(new Observable<Integer>() {
@Override
protected void subscribeActual(Observer<? super Integer> observer) {
observer.onNext(4);
observer.onNext(5);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
Log.d(TAG, e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
6) retry() 操作符
/**
* ====================retry() 操作符 ======================
*
* 作用是:出现错误时,让被观察者重新发送数据
* 注:若发送错误,则一直重新发送
*
* 有几个重载方法:
* retry() : 出现错误时,让被观察者重新发送数据。若错误一直发生,则一直重新发送
*
* retry(long time):与retry不同的书,若错误一直发生,被观察者则一直重新发送数据,但这持续重新发送有次数限制
*
* retry(Predicate predicate) : 出现错误时,根据指定逻辑(可以捕获到发生的错误)决定是否让被观察者重新发送数据
*
* retry(new BiPredicate<Integer, Throwable>):出现错误时,根据指定逻辑(可以捕获重发的次数和发生的错误)决定是否让被观察者重新发送数据
*
* retry(long time,Predicate predicate) : 出现错误时,根据指定逻辑(可以捕获到发生的错误)决定是否让被观察者重新发送数据。并且有持续重发的次数限制
*/
public static void retry() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Throwable("发生错误了"));
emitter.onNext(3);
}
})
.retry(new BiPredicate<Integer, Throwable>() {
@Override
public boolean test(Integer integer, Throwable throwable) throws Exception {
// interger 为重试次数 ,throwable 为捕获到的异常
Log.e(TAG + "retry", throwable.getMessage());
Log.e(TAG + "integer", "重试次数: " + integer);
//return true : 重新发送请求(若持续遇到错误,就持续重新发送)
//return false : 不重新发送数据 并且调用观察者的onError()方法结束
if (integer > 2)
return false;
return true;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e(TAG + "retry", String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
7) retryUntil() 操作符
/**
* ===================retryUntil() 操作符============================
*
* 发送事件遇到错误,指定规则是否重新发送。retry(Predicate predicate)。
*
* return true : 不重新发送请求,并且调用观察者的onError()方法结束
* return false : 重新发送数据(若持续遇到错误,就持续重新发送)
*/
public static void retryUntil() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Throwable("发生错误了"));
emitter.onNext(3);
}
})
.retryUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
//return true : 不重新发送请求,并且调用观察者的onError()方法结束
// return false : 重新发送数据(若持续遇到错误,就持续重新发送)
return false;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e(TAG + "retryUntil", String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "retryUntil", "onComplete");
}
});
}
8) retryWhen() 操作符
/**
* ===================retryWhen() 操作符============================
*
* 遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并决定是否需要重新订阅原始被观察者(Observable) & 发送事件
*/
public static void retryWhen() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onError(new Throwable("发送了错误"));
emitter.onNext(3);
}
})
//遇到Error时会回调
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
//1、若返回的Observable发送的事件 = Error ,则原始的Observable则不重新发送事件。该异常信息可在观察者的onError中获得
//return Observable.error(throwable);
//2、若返回的Observable发送的事件= Next事件(和next的内容无关),则原始的Observable重新发送事件(若持续遇到错误,则持续发送)
return Observable.just(5); //仅仅是作为一个触发重新订阅原被观察者的通知,什么数据并不重要,只有不是onComplete/onError事件
}
});
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG + "retryWhen", String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
Log.d(TAG + "retryWhen", e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG + "retryWhen", "onComplete");
}
});
}
9) repeat() 操作符
/**
* ===============repeat() 操作符==============
*
* repeat操作符的作用是重复发射 observable的数据序列,可以使无限次也可以是指定次数.不传时为重复无限次
*/
public static void repeat() {
Observable
.just(1, 2, 3)
.repeat(3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "repeat", String.valueOf(integer));
}
});
}
10)repeatWhen() 操作符
/**
* ===============repeatWhen() 操作符==============
*
* 将原始 Observable 停止发送事件的标识(Complete() / Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable)
* ,以此决定是否重新订阅 & 发送原来的 Observable
*/
public static void repeatWhen() {
Observable
.just(1, 2, 4)
.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Object o) throws Exception {
//若新被观察者(Observable)返回1个Complete()/ Error()事件,则不重新订阅 & 发送原来的 Observable
//Observable.empty() = 发送Complete事件,但不会回调观察者的onComplete()
return Observable.empty();
// return Observable.error(new Throwable("不再重新订阅事件"));
// 返回Error事件 = 回调onError()事件,并接收传过去的错误信息。
// 情况2:若新被观察者(Observable)返回其余事件,则重新订阅 & 发送原来的 Observable
// return Observable.just(1);
// 仅仅是作为1个触发重新订阅被观察者的通知,发送的是什么数据并不重要,只要不是Complete() / Error()事件
}
});
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
11)debounce() 操作符
/**
* ===============debounce() 操作符==============
* <p>
* 一定的时间内没有操作就会发送事件(只会发送最后一次操作的事件)。
* <p>
* 以下的例子: 发送5个事件,每个事件间隔1秒。但是debounce限定了2秒内没有任何操作才会真正发送事件。所以只有最后一次满足条件,只能接收到事件 5
*/
public static void debounce() {
Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
.debounce(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG + "debounce", String.valueOf(aLong));
}
});
}
12)subscribeOn / ObserverOn 操作符
/**
* ===============subscribeOn() 操作符==============
* ===============observerOn() 操作符==============
* <p>
* <p>
* subscribeOn : 发送事件的线程
* observerOn: 接收事件的线程
* <p>
* 线程调度器:
* Schedulers.io(): 代表io操作的线程,通常用于网络,读写文件等io密集型的操作
* Schedulers.compucation(): 代表CPU计算密集型的操作,例如需要大量计算的操作
* Schedulers.newThread(): 代表一个常规的新线程
* AndroidSchedulers。mainThread(): 代表Android的主线程
*/
public static void subscribeOn_observerOn() {
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("事件");
Log.d(TAG + "subscribeOn_ObserverOn", "发送事件:" + Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG + "subscribeOn_ObserverOn", "接收事件: " + Thread.currentThread().getName());
}
});
}
5、过滤操作符:用于将Observable发送的数据进行过滤和选择。让Observable返回我们所需要的数据。过滤操作符有buffer(),filter(),skip(),take(),skipLast(),takeLast(),throttleFirst(),distainctUntilChange()。
1)filter() 操作符
/**
* ========================filter() 操作符 ======================================
*
* 对被观察者发送的事件做过滤操作。只有符合筛选条件的事件才会被观察者所接收。
*
* return true : 继续发送
*
* return false : 不发送
*/
public static void filter() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 3; i++) {
emitter.onNext(i);
}
}
})
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
//return true : 继续发送
//return false : 不发送
return integer != 2; // 本例子过滤了 等于2的情况
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "filter", String.valueOf(integer));
}
});
}
2)distinct() 操作符
/**
* ========================distinct 操作符 ======================================
*
* 简单地说就是去重。发射的数据包含重复的,会将重复的筛选掉。也就是,它只允许还没有被发射过的数据通过,被观察者接收。发射完数据会自动调用onComplete()方法
*
* y以下代码:观察者只会接收到 : 1,2,3,5 四个数值
*/
public static void distinct() {
Observable
.just(1, 2, 3, 2, 3, 5)
.distinct()
.subscribe(
new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG + "distinct", String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "distinct", "onComplete");
}
});
}
3)distinctUntilChanged() 操作符
/**
* ========================distinctUntilChanged 操作符 ======================================
*
* 过滤掉连续重复的事件
*/
public static void distinctUntilChanged() {
Observable.just(1, 2, 3, 1, 2, 3, 3, 4, 4)
.distinctUntilChanged()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, String.valueOf(integer));
}
});
}
4)skip(),skipLast()操作符
/**
* ========================skip,skipLast======================================
*
* skip 操作符是把Observable发射的数据过滤点掉前n项。而take操作符只能接收前n项。另外还有skipLast和takeLast则是从后往前进行过滤.接收完会调用onComplete()方法
*
*/
/**
* 以下代码输出: 3,4,5
*/
public static void skip() {
Observable
.just(1, 2, 3, 4, 5, 6, 7)
.skip(2) //过滤前2项
.skipLast(3) //过滤后3项
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG + "skip", "根据顺序过滤" + String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "skip", "onComplete");
}
});
//-------------------------------------------------------------------------------------------------
Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
.skip(1, TimeUnit.SECONDS) //过滤地1s发送的数据
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d(TAG + "skip", "根据事件过滤" + String.valueOf(aLong));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
5)take() 操作符
/**
* ====================== take 操作符===========================
* 只能接收两个事件
*/
public static void take() {
Observable
.just(1, 2, 3, 4, 5, 6, 7)
.take(2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG + "skip", String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "skip", "onComplete");
}
});
}
6)takeLast() 操作符
/**
* =======================takeLast() 操作符 ======================
*
* 只能接收被观察者发送的最后几个事件
*/
public static void takeLast() {
Observable
.just(1, 2, 3, 4, 5, 6, 7)
.takeLast(2)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG + "skip", String.valueOf(integer));
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG + "skip", "onComplete");
}
});
}
7)elementAt() 操作符
/**
* ========================elementAt() 操作符 ======================================
*
* 只发射第n项数据.
* 一个参数和两个参数时:
* elementAt(第n项)
* elementAt(第n项,第N项不存在时默认值)
*
* n为负数时,报IndexOUtOfBoundExection。为正数但超过发射数据长度不会报异常会使用默认值代替
*/
public static void elementAt() {
Observable.range(1, 5)
.elementAt(6, 10)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "elementAt", String.valueOf(integer));
}
});
}
8)elementAtOrError()操作符
/**
* ==============elementAtOrError()===================================================
*
* 在elementAtError()的基础上,当出现越界情况(当获取位置的索引>事件序列的长度),即抛出异常
*/
public static void elementAtOrError() {
Observable.range(1, 5)
.elementAtOrError(6)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "elementAtOrErr", String.valueOf(integer));
}
});
}
9)ignoreElements() 操作符
/**
* ========================ignoreElements() 操作符 ======================================
*
* 不管发射的数据.只希望在它完成时和遇到错误时收到通知
*/
public static void ignoreElements() {
Observable
.range(0, 10)
.ignoreElements()
.subscribe(new Action() {
@Override
public void run() throws Exception {
Log.d(TAG + "ignoreEles", "完成了");
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG + "ignoreEles", "出错了");
}
});
}
10)ofType() 操作符
/**
* ========================ofType() 操作符 ======================================
*
* 通过数据的类型过滤数据,只发送指定类型数据。
*
* 以下代码观察者只接收到: 1,2
*/
public static void ofType() {
Observable.just("哈哈", 1, "嘻嘻", 2, 3.5)
.ofType(Integer.class)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "ofType", String.valueOf(integer));
}
});
}
11)throttleFirst()/throttleLast() 操作符
/**
* ========================throttleFirst()/throttleLast() 操作符 ======================================
*
* throttleFirst() 在某段时间内,只发送该段事件第一次事件
*
* throttleLast() 在某段时间内,只发送该段事件最后一次事件
*/
public static void throttleFirst() {
Observable.interval(300, TimeUnit.MILLISECONDS) //每个0.3秒发送一个事件
.throttleFirst(1, TimeUnit.SECONDS) //只接收每秒内发送的第一个数据
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG + "throttleFirst", String.valueOf(aLong));
}
});
}
12)sample()
/**
* ===================sample()=================================
*
* 在某段时间内,只发送该段时间内最新(最后)1次事件
*
* 与throttleLast类似
*/
public static void sample() {
Observable.interval(300, TimeUnit.MILLISECONDS) //每个0.3秒发送一个事件
.sample(1, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG + "sample", String.valueOf(aLong));
}
});
}
13)firstElement()/lastElement()
/**
* ========================firstElement()/lastElement()==========================
*
* 选取第一个元素/最后一个元素
*/
public static void firstElement() {
Observable.just(1, 2, 3)
.firstElement()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "firstElement", String.valueOf(integer));
}
});
}
public static void lastElement() {
Observable
.just(1, 2, 3)
.lastElement()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG + "lastElement", String.valueOf(integer));
}
});
}