Android函數響應式程式設計最新RxJava-基本用法
Android函數響應式程式設計最新RxJava-操作符入門(1)
Github位址
今天介紹一下過濾操作符群組合操作符
1.過濾操作符
過濾操作符用于過濾和選擇Observable發射的資料序列,讓Observable隻傳回滿足我們條件的資料。過濾操作符有filter、elementAt、distinct、skip、take、ignoreElements、throttleFirst、throttlewithtimeout等等。
(1)filter
filter操作符是對Observable産生的結果自定義規則進行過濾,隻有滿足條件的才能送出給訂閱者。
Observable.just(,,,).filter(new AppendOnlyLinkedArrayList.NonThrowingPredicate<Integer>() {
@Override
public boolean test(Integer integer) {
return integer>;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG,integer+"");
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
上面代碼是把just裡的值大于2的送出給訂閱者,列印的LOG如下:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiADNyEzLcd3LcJzLcJzdllmVldWYtl2Q3UCcpJHdz9CX05WZpJ3bt8Gd1F2LcJjcn9WTldWYtl2Pn5GcuAjYmRTMlF2N2kTOhVWY5QWLxkDO3gzNy8CXzV2Zh1WafRWYvxGc19CXvlmL1h2cuFWaq5ycldWYtlWLkF2bsBXdvw1LcpDc0RHaiojIsJye.png)
(2)distinct
distinct操作符用來去重,就是去掉重複的資料,也就是每次發射的資料不一樣,而distinctUntilChanged是去掉連續重複的資料.
Observable.just(,,,,,,,).distinct().subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG,integer+"");
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
(3)skip 與 take
skip操作符是把Observable發射的資料過濾掉前N項,而take操作符隻取前N項。
下面是skip的代碼:
Observable.just(,,,,).skip().subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG,integer+"");
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
運作結果如下:
take操作符的代碼我就不貼出來了 ,意思是相同的。
(4)ignoreElements
ignoreElements操作符是忽略Observable産生的結果,隻走onComplete()和onError()方法。
Observable.just(,,,).ignoreElements().subscribe(new CompletableObserver() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e(TAG,"onSubscribe");
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG,"onError");
}
});
(5)throttleFirst
throttleFirst 操作符則會定期發這個時間裡Observable發射的第一個。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i=;i<;i++){
e.onNext(i);
try {
Thread.sleep();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
e.onComplete();
}
}).throttleFirst(, TimeUnit.MILLISECONDS).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG,""+integer);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
每隔300ms,就取發射源裡的第一個資料,列印如下:
2.組合操作符
組合操作符可以同時處理多個Observable,組合操作符有 mergestartwith,join,switch,concat,cmbineLastest等等
(1)startWith
startWith操作符是在發射的資料前插入一條資料。
Observable.just(,).startWith().subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG,""+integer);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
(2)merge
merge操作符是将多個Observable合并到一個中并發射,但是發射的資料是交錯的,concat操作符是嚴格按照順序合并發射的。
Observable<Integer> ob1=Observable.just(,).subscribeOn(Schedulers.computation());
Observable<Integer> ob2=Observable.just(,).subscribeOn(Schedulers.computation());
Observable.merge(ob1,ob2).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
Log.e(TAG,""+integer);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});