過濾操作
通過上篇的學習,我們知道了如何去轉換一個Observable對象成我們想要的格式,但是我們想直接省去if else而拿到想要的資料,這個怎麼做呢?那麼我們就在這篇文章裡來系統的學習一下吧
- Debounce操作符
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i=;i<;i++) {
subscriber.onNext(i);
int time=;
if (i%==) {
time=;
}
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onCompleted();
}
//過濾掉不足300ms的
}).debounce(, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("SampleFilteringActivity", "integer:" + integer);
}
});
運作結果
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
debounce操作符是
在源Observable發射之後,在規定的時間内沒有别的結果産生,則把這個結果送出給訂閱者。由于最後一條資料沒有其他資料覆寫,是以訂閱者都可以收到
除了在時間範疇上進行控制,還可以根據函數來限流
Observable.just(, , , , , , , , , ).debounce(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(final Integer integer) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (integer%==) {
subscriber.onCompleted();
}
}
});
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("SampleFilteringActivity", "integer:" + integer);
}
});
運作結果
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
隻要能被4整除,訂閱者就可以收到。最後一個值情況例外
throttleWithTimeout在時間限流上功能跟debounce一緻
- Distinct操作符
Observable.just(, , , , , ).distinct().subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("SampleFilteringActivity", "integer:" + integer);
}
});
運作結果
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
功能就是
去重
如果是
distinctUntilChangedObserver
,
他不是完全過濾,隻是連續N個相同的資料,僅僅保留一個,後面的他就不管了
,例如
Observable.just(, , , , , , , , ).distinctUntilChanged().subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("SampleFilteringActivity", "integer:" + integer);
}
});
運作結果
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
下面介紹多點功能差不多的操作符:
- elementAt操作符
- take操作符
- first操作符
- last操作符
- takeFirst操作符
- takeLast操作符
- skip
-
skipLast
這裡簡單說明下
,elementAt是隻發射第N個元素
,take是隻發射前面N個元素
,first是隻發射第一個元素,等同于elementAt(0)
,last是隻發射最後一個元素
,takeFirst類似于take,也類似于first,也是隻發射前面N個元素。他和take的差別在于如果Observable沒有發射資料,take會抛出NoSuchElementException異常,而takeFirst不會,他會産生一個沒有onNext隻有onCompleted的空Observable
takeLast是隻發射最後N個元素
skip是跳過之前skip個元素,直接傳回後面的所有元素
skipLast是跳過最後的幾個元素,直接傳回之前的所有元素
- Filter操作符
Observable.just(, , , , , ).filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer>;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d("SampleFilteringActivity", "integer:" + integer);
}
});
運作結果
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
對源Observable發射出來的資料按照一定條件進行過濾
- OfType操作符
Observable.just("1", , ).ofType(String.class).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("SampleFilteringActivity", s);
}
});
運作結果
com.renyu.rxdemo D/SampleFilteringActivity:
類似于filter但是又不同,
他是按照資料類型進行過濾
,此處就是找出String類型的發射結果
- single操作符
Observable.just(, , , , , ).single(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
//取大于4的唯一一個元素,否則抛出異常
return integer>=;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Integer integer) {
Log.d("SampleFilteringActivity", "integer:" + integer);
}
});
運作結果
com.renyu.rxdemo D/SampleFilteringActivity: integer:
同樣是對源Observable發射出的資料進行判斷,如果傳回的過濾結果數量不是1,他就抛java.lang.IllegalArgumentException: Sequence contains too many elements
- ignoreElements
忽略掉所有的傳回結果,僅僅保留onError和onCompleted
- Sample
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i=;i<;i++) {
subscriber.onNext(""+i);
try {
Thread.sleep();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).sample(, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread()).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("SampleFilteringActivity", s);
}
});
運作結果
com.renyu.rxdemo D/SampleFilteringActivity:
com.renyu.rxdemo D/SampleFilteringActivity:
com.renyu.rxdemo D/SampleFilteringActivity:
com.renyu.rxdemo D/SampleFilteringActivity:
sample是
按照指定的時間間隔去定時掃描Observable發射出的最後一條資料
,通過下圖來說明
2.2s的時候正好資料2發射完成,4.4s的時候資料5完成,以此類推,最後一條資料9發出之後沒有發生改變,直到8.8s被調用顯示
主要參考文章
- 給 Android 開發者的 RxJava 詳解 扔物線大神的文章,我的RXJava啟蒙文章
- 木水川的部落格 對RXJava操作符做了很詳細的說明以及提供詳細的示例
- ReactiveX文檔中文翻譯 mcxiaoke翻譯的RXJava中文文檔
- Android RxJava使用介紹 job_hesc對RXJava操作符介紹以及簡單的示例