Android函数响应式编程最新RxJava-基本用法
Android函数响应式编程最新RxJava-操作符入门(1)
Github地址
今天介绍一下过滤操作符和组合操作符
1.过滤操作符
过滤操作符用于过滤和选择Observable发射的数据序列,让Observable只返回满足我们条件的数据。过滤操作符有filter、elementAt、distinct、skip、take、ignoreElements、throttleFirst、throttlewithtimeout等等。
(1)filter
filter操作符是对Observable产生的结果自定义规则进行过滤,只有满足条件的才能提交给订阅者。
Observable.just(,,,).filter(new AppendOnlyLinkedArrayList.NonThrowingPredicate<Integer>() {
@Override
public boolean test(Integer integer) {
return integer>;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG,integer+"");
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
上面代码是把just里的值大于2的提交给订阅者,打印的LOG如下:
(2)distinct
distinct操作符用来去重,就是去掉重复的数据,也就是每次发射的数据不一样,而distinctUntilChanged是去掉连续重复的数据.
Observable.just(,,,,,,,).distinct().subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG,integer+"");
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
(3)skip 与 take
skip操作符是把Observable发射的数据过滤掉前N项,而take操作符只取前N项。
下面是skip的代码:
Observable.just(,,,,).skip().subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG,integer+"");
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
运行结果如下:
take操作符的代码我就不贴出来了 ,意思是相同的。
(4)ignoreElements
ignoreElements操作符是忽略Observable产生的结果,只走onComplete()和onError()方法。
Observable.just(,,,).ignoreElements().subscribe(new CompletableObserver() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e(TAG,"onSubscribe");
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG,"onError");
}
});
(5)throttleFirst
throttleFirst 操作符则会定期发这个时间里Observable发射的第一个。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i=;i<;i++){
e.onNext(i);
try {
Thread.sleep();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
e.onComplete();
}
}).throttleFirst(, TimeUnit.MILLISECONDS).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG,""+integer);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
每隔300ms,就取发射源里的第一个数据,打印如下:
2.组合操作符
组合操作符可以同时处理多个Observable,组合操作符有 mergestartwith,join,switch,concat,cmbineLastest等等
(1)startWith
startWith操作符是在发射的数据前插入一条数据。
Observable.just(,).startWith().subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG,""+integer);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
(2)merge
merge操作符是将多个Observable合并到一个中并发射,但是发射的数据是交错的,concat操作符是严格按照顺序合并发射的。
Observable<Integer> ob1=Observable.just(,).subscribeOn(Schedulers.computation());
Observable<Integer> ob2=Observable.just(,).subscribeOn(Schedulers.computation());
Observable.merge(ob1,ob2).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG,""+integer);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});