过滤操作
通过上篇的学习,我们知道了如何去转换一个Observable对象成我们想要的格式,但是我们想直接省去if else而拿到想要的数据,这个怎么做呢?那么我们就在这篇文章里来系统的学习一下吧
- Debounce操作符
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i=;i<;i++) {
subscriber.onNext(i);
int time=;
if (i%==) {
time=;
}
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onCompleted();
}
//过滤掉不足300ms的
}).debounce(, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("SampleFilteringActivity", "integer:" + integer);
}
});
运行结果
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
debounce操作符是
在源Observable发射之后,在规定的时间内没有别的结果产生,则把这个结果提交给订阅者。由于最后一条数据没有其他数据覆盖,所以订阅者都可以收到
除了在时间范畴上进行控制,还可以根据函数来限流
Observable.just(, , , , , , , , , ).debounce(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(final Integer integer) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (integer%==) {
subscriber.onCompleted();
}
}
});
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("SampleFilteringActivity", "integer:" + integer);
}
});
运行结果
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
只要能被4整除,订阅者就可以收到。最后一个值情况例外
throttleWithTimeout在时间限流上功能跟debounce一致
- Distinct操作符
Observable.just(, , , , , ).distinct().subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("SampleFilteringActivity", "integer:" + integer);
}
});
运行结果
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
功能就是
去重
如果是
distinctUntilChangedObserver
,
他不是完全过滤,只是连续N个相同的数据,仅仅保留一个,后面的他就不管了
,例如
Observable.just(, , , , , , , , ).distinctUntilChanged().subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("SampleFilteringActivity", "integer:" + integer);
}
});
运行结果
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
下面介绍多点功能差不多的操作符:
- elementAt操作符
- take操作符
- first操作符
- last操作符
- takeFirst操作符
- takeLast操作符
- skip
-
skipLast
这里简单说明下
,elementAt是只发射第N个元素
,take是只发射前面N个元素
,first是只发射第一个元素,等同于elementAt(0)
,last是只发射最后一个元素
,takeFirst类似于take,也类似于first,也是只发射前面N个元素。他和take的区别在于如果Observable没有发射数据,take会抛出NoSuchElementException异常,而takeFirst不会,他会产生一个没有onNext只有onCompleted的空Observable
takeLast是只发射最后N个元素
skip是跳过之前skip个元素,直接返回后面的所有元素
skipLast是跳过最后的几个元素,直接返回之前的所有元素
- Filter操作符
Observable.just(, , , , , ).filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("SampleFilteringActivity", "integer:" + integer);
}
});
运行结果
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
对源Observable发射出来的数据按照一定条件进行过滤
- OfType操作符
Observable.just("1", , ).ofType(String.class).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("SampleFilteringActivity", s);
}
});
运行结果
com.renyu.rxdemo D/SampleFilteringActivity:
类似于filter但是又不同,
他是按照数据类型进行过滤
,此处就是找出String类型的发射结果
- single操作符
Observable.just(, , , , , ).single(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
//取大于4的唯一一个元素,否则抛出异常
return integer>=;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Integer integer) {
Log.d("SampleFilteringActivity", "integer:" + integer);
}
});
运行结果
com.renyu.rxdemo D/SampleFilteringActivity: integer:
同样是对源Observable发射出的数据进行判断,如果返回的过滤结果数量不是1,他就抛java.lang.IllegalArgumentException: Sequence contains too many elements
- ignoreElements
忽略掉所有的返回结果,仅仅保留onError和onCompleted
- Sample
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i=;i<;i++) {
subscriber.onNext(""+i);
try {
Thread.sleep();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).sample(, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread()).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("SampleFilteringActivity", s);
}
});
运行结果
com.renyu.rxdemo D/SampleFilteringActivity:
com.renyu.rxdemo D/SampleFilteringActivity:
com.renyu.rxdemo D/SampleFilteringActivity:
com.renyu.rxdemo D/SampleFilteringActivity:
sample是
按照指定的时间间隔去定时扫描Observable发射出的最后一条数据
,通过下图来说明
2.2s的时候正好数据2发射完成,4.4s的时候数据5完成,以此类推,最后一条数据9发出之后没有发生改变,直到8.8s被调用显示
主要参考文章
- 给 Android 开发者的 RxJava 详解 扔物线大神的文章,我的RXJava启蒙文章
- 木水川的博客 对RXJava操作符做了很详细的说明以及提供详细的示例
- ReactiveX文档中文翻译 mcxiaoke翻译的RXJava中文文档
- Android RxJava使用介绍 job_hesc对RXJava操作符介绍以及简单的示例