RxJava2極速入門——Rxjava操作符詳解之過濾操作符
- RxJava操作符——過濾操作符
-
- elementAt
-
- 直接index索引
- 直接索引并設定預設角标
- elementAt(long index, T defaultItem) 與 elementAt(long index) 差別
- frist
- last
- IngoreElements
- filter
- distinct
-
- 基本使用
- 自定義key使用
- debounce
- take
-
- count用法
- time用法
- takeLast
-
- count的用法
- time的用法
- time and count用法
- skip
-
- count用法
- time用法
- skipLast
-
- count用法
- time用法
- 總結
RxJava操作符——過濾操作符
清明時節雨紛紛,路上行人欲斷魂。感慨一下,哪怕是祭奠先人,也别忘了學習,一周一更。正編開始:
在ReactiveX中轉換操作時這樣子描述的Operators that selectively emit items from a source Observable,其含義就是将需要發射的Observables通過使用Filtering Operators這種操作相關的函數修飾符過濾成真正需要發射的Observable。
過濾操作符常見分類如下:
elementAt
elementAt:emit only item n emitted by an Observable
elementAt隻擷取Observables中指定角标的Observable并發射。
在elementAt中有兩種用法:
直接index索引
原理圖如下:
從原理圖可以得出角标索引遵循index為0作為起始去搜尋指定的資料,然後當截取到資料後将它發射出去最後終止整個事件的訂閱。
是以傳入的index必須大于等于0,并且保證角标不能越界否則會出現運作異常。
示例代碼以及部分相關源碼如下:
private fun operatorsElementAt() {
var justIndexDispoable: Disposable? = null
Observable.rangeLong(1, 5).elementAt(0).subscribe(
getBaseLongMaybeObserver("operatorsElementAt-使用角标索引", {
justIndexDispoable = it
}, {
justIndexDispoable?.dispose()
})
)
}
/**
* 運作結果如下:
* com.ypz.rxjavademo I/operatorsElementAt-使用角标索引: onSubscribe
* com.ypz.rxjavademo I/operatorsElementAt-使用角标索引: onNext:value1
* 相關源碼如下
* */
/**
* Returns a Maybe that emits the single item at a specified index in a sequence of emissions from
* this Observable or completes if this Observable signals fewer elements than index.
* @param index
* the zero-based index of the item to retrieve
* @return a Maybe that emits a single item: the item at the specified position in the sequence of
* those emitted by the source ObservableSource
* @throws IndexOutOfBoundsException
* if {@code index} is less than 0
* @see <a href="http://reactivex.io/documentation/operators/elementat.html" target="_blank" rel="external nofollow" target="_blank" rel="external nofollow" target="_blank" rel="external nofollow" >ReactiveX operators documentation: ElementAt</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Maybe<T> elementAt(long index) {
if (index < 0) {
throw new IndexOutOfBoundsException("index >= 0 required but it was " + index);
}
return RxJavaPlugins.onAssembly(new ObservableElementAtMaybe<T>(this, index));
}
public final class ObservableElementAtMaybe<T> extends Maybe<T> implements FuseToObservable<T> {
final ObservableSource<T> source;
final long index;
public ObservableElementAtMaybe(ObservableSource<T> source, long index) {
this.source = source;
this.index = index;
}
tic final class ElementAtObserver<T> implements Observer<T>, Disposable {}
}
從示例代碼以及源碼可以得出 elementAt(long index) 這一用法得益于ObservableElementAtMaybe的實作,其通過繼承Maybe并實作FuseToObservable,在通過ElementAtObserver的實作,注意這裡的ElementAtObserver既為觀察者也為被觀察者一個中轉作用,通過這種作用過濾出相應角标Observable進行發射最後取消訂閱事件的
直接索引并設定預設角标
原理圖如下:
從原理圖可得當無法索引到相應角标的Observable,則通過所有defaultIndex去尋找相應的Observable。最終發射出去
示例代碼以及相關源碼如下:
private fun operatorsElementAt() {
var usedDefinedIndexDispoable: Disposable? = null
Observable.rangeLong(1, 5).elementAt(6, 3).subscribe(
getBaseLongSingleObserver("operatorsElementAt-預設角标用法",
{ usedDefinedIndexDispoable = it },
{ usedDefinedIndexDispoable?.dispose() })
)
}
/**
* 運作結果如下:
* com.ypz.rxjavademo I/operatorsElementAt-預設角标用法: onSubscribe
* com.ypz.rxjavademo I/operatorsElementAt-預設角标用法: onNext:value3
* 相關源碼如下
* */
/**
* Returns a Single that emits the item found at a specified index in a sequence of emissions from
* this Observable, or a default item if that index is out of range.
* @param index
* the zero-based index of the item to retrieve
* @param defaultItem
* the default item
* @return a Single that emits the item at the specified position in the sequence emitted by the source
* ObservableSource, or the default item if that index is outside the bounds of the source sequence
* @throws IndexOutOfBoundsException
* if {@code index} is less than 0
* @see <a href="http://reactivex.io/documentation/operators/elementat.html" target="_blank" rel="external nofollow" target="_blank" rel="external nofollow" target="_blank" rel="external nofollow" >ReactiveX operators documentation: ElementAt</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> elementAt(long index, T defaultItem) {
if (index < 0) {
throw new IndexOutOfBoundsException("index >= 0 required but it was " + index);
}
ObjectHelper.requireNonNull(defaultItem, "defaultItem is null");
return RxJavaPlugins.onAssembly(new ObservableElementAtSingle<T>(this, index, defaultItem));
}
public final class ObservableElementAtSingle<T> extends Single<T> implements FuseToObservable<T> {
final ObservableSource<T> source;
final long index;
final T defaultValue;
public ObservableElementAtSingle(ObservableSource<T> source, long index, T defaultValue) {
this.source = source;
this.index = index;
this.defaultValue = defaultValue;
}
@Override
public void subscribeActual(SingleObserver<? super T> t) {
source.subscribe(new ElementAtObserver<T>(t, index, defaultValue));
}
@Override
public Observable<T> fuseToObservable() {
return RxJavaPlugins.onAssembly(new ObservableElementAt<T>(source, index, defaultValue, true));
}
static final class ElementAtObserver<T> implements Observer<T>, Disposable {
/*内部源碼省略*/
}
}
從示例代碼以及源碼可以得出 elementAt(long index, T defaultItem) 這一用法得益于ObservableElementAtSingle的實作,其通過繼承single并實作FuseToObservable,在通過ElementAtObserver的實作,注意這裡的ElementAtObserver既為觀察者也為被觀察者一個中轉作用,通過這種作用過濾出相應角标Observable進行發射最後取消訂閱事件的
elementAt(long index, T defaultItem) 與 elementAt(long index) 差別
elementAt(long index)底層實作基于ObservableElementAtMaybe以及ObservableElementAtMaybe内部的ElementAtObserver,傳回的是Maybe,ObservableElementAtMaybe繼承Maybe。
elementAt(long index, T defaultItem)底層實作基于ObservableElementAtSingle以及ObservableElementAtSingle内部的ElementAtObserver,傳回的是Single,ObservableElementAtMaybe繼承Single。
兩者都實作了FuseToObservable接口。
frist
first:emit only the first item (or the first item that meets some condition) emitted by an Observable
從Observables中發射第一個(符合條件的元素)
原理圖如下:
從原理圖中可以看到first與**elementAt(long index, T defaultItem)**十分相似,他們是否就是一緻的呢請看接下來的代碼。
示例代碼以及相關源碼如下:
private fun operatorsFirst() {
var disposable: Disposable? = null
Observable.rangeLong(1, 5).first(1).subscribe(
getBaseLongSingleObserver("operatorsFirst",
{ disposable = it },
{ disposable?.dispose() }))
}
/**
* 運作結果如下:
* com.ypz.rxjavademo I/operatorsFirst: onSubscribe
* com.ypz.rxjavademo I/operatorsFirst: onNext:value1
* 源碼如下:
* */
/**
* Returns a Single that emits only the very first item emitted by the source ObservableSource, or a default item
* if the source ObservableSource completes without emitting any items.
*
* @param defaultItem
* the default item to emit if the source ObservableSource doesn't emit anything
* @return the new Single instance
* @see <a href="http://reactivex.io/documentation/operators/first.html" target="_blank" rel="external nofollow" target="_blank" rel="external nofollow" >ReactiveX operators documentation: First</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> first(T defaultItem) {
return elementAt(0L, defaultItem);
}
結合示例代碼以及源碼得出frist其實就是elementAt(long index, T defaultItem),就是換燙不換藥,但是first預設是擷取第一項起始角标。
last
last:emit only the last item (or the last item that meets some condition) emitted by an Observable
從Observables中發射最後一個(符合條件的元素)
原理圖如下:
private fun operatorsLast() {
var disposable: Disposable? = null
Observable.rangeLong(1, 5).last(1).subscribe(
getBaseLongSingleObserver("operatorsLast",
{ disposable = it },
{ disposable?.dispose() }))
}
/**
* 運作結果如下:
* com.ypz.rxjavademo I/operatorsLast: onSubscribe
* com.ypz.rxjavademo I/operatorsLast: onNext:value5
* 相關源碼如下:
* */
/**
* Returns a Single that emits only the last item emitted by this Observable, or a default item
* if this Observable completes without emitting any items.
* @param defaultItem
* the default item to emit if the source ObservableSource is empty
* @return a Single that emits only the last item emitted by the source ObservableSource, or a default item
* if the source ObservableSource is empty
* @see <a href="http://reactivex.io/documentation/operators/last.html" target="_blank" rel="external nofollow" target="_blank" rel="external nofollow" >ReactiveX operators documentation: Last</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> last(T defaultItem) {
ObjectHelper.requireNonNull(defaultItem, "defaultItem is null");
return RxJavaPlugins.onAssembly(new ObservableLastSingle<T>(this, defaultItem));
}
public final class ObservableLastSingle<T> extends Single<T> {
final ObservableSource<T> source;
final T defaultItem;
public ObservableLastSingle(ObservableSource<T> source, T defaultItem) {
this.source = source;
this.defaultItem = defaultItem;
}
// TODO fuse back to Observable
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
source.subscribe(new LastObserver<T>(observer, defaultItem));
}
static final class LastObserver<T> implements Observer<T>, Disposable {
//部分源碼省略
}
}
結合示例代碼以及源碼可以得出last的實作是基于ObservableLastSingle的實作,而ObservableLastSingle則通過繼承Single以及内部LastObserver的實作。
IngoreElements
IngoreElements:do not emit any items from an Observable but mirror its termination notification
原理圖如下:
結合原理圖可以得出:不發射任何資料隻關注與資料是否發送完成或者發送出現異常中斷
示例代碼以及源碼如下:
private fun operatorsIgnoreElements() {
Observable.rangeLong(1, 5).ignoreElements().subscribe(
{ logIMessage("operatorsIgnoreElements", "onComplete") },
{
logIMessage("operatorsIgnoreElements", "errorMessage:${it.message
?: "unkown error"}")
})
}
/**
* 運作結果如下:
* com.ypz.rxjavademo I/operatorsIgnoreElements: onComplete
* 相關源碼如下:
* */
/**
* Ignores all items emitted by the source ObservableSource and only calls {@code onComplete} or {@code onError}.
*
* @return the new Completable instance
* @see <a href="http://reactivex.io/documentation/operators/ignoreelements.html" target="_blank" rel="external nofollow" target="_blank" rel="external nofollow" >ReactiveX operators documentation: IgnoreElements</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable ignoreElements() {
return RxJavaPlugins.onAssembly(new ObservableIgnoreElementsCompletable<T>(this));
}
public final class ObservableIgnoreElementsCompletable<T> extends Completable implements FuseToObservable<T> {
final ObservableSource<T> source;
public ObservableIgnoreElementsCompletable(ObservableSource<T> source) {
this.source = source;
}
@Override
public void subscribeActual(final CompletableObserver t) {
source.subscribe(new IgnoreObservable<T>(t));
}
@Override
public Observable<T> fuseToObservable() {
return RxJavaPlugins.onAssembly(new ObservableIgnoreElements<T>(source));
}
static final class IgnoreObservable<T> implements Observer<T>, Disposable {
@Override
public void onNext(T v) {
// deliberately ignored
}
}
}
結合示例代碼以及源碼可以看到IngoreElements底層是是基于ObservableIgnoreElementsCompletable的實作,而ObservableIgnoreElementsCompletable通過繼承Completable并實作FuseToObservable接口,而其内部的IgnoreObservable這一類對onNext實作是一個nothing to do做法,專注于onComple以及onError的處理所實作的一個結果。
filter
filter:emit only those items from an Observable that pass a predicate test
原理圖如下:
結合原理圖可得:filter是将Observables進行條件過濾然後發射出去。
示例代碼以及源碼如下:
private fun operatorsFilter() {
Observable.rangeLong(1, 10).
filter { (it % 2 == 0L) }.
subscribe(ShowMessageBaseObserver<Long>("operatorsFilter"))
}
/**
* 運作結果如下:
* com.ypz.rxjavademo I/operatorsFilter: onSubscribe
* com.ypz.rxjavademo I/operatorsFilter: onNext:value2
* com.ypz.rxjavademo I/operatorsFilter: onNext:value4
* com.ypz.rxjavademo I/operatorsFilter: onNext:value6
* com.ypz.rxjavademo I/operatorsFilter: onNext:value8
* com.ypz.rxjavademo I/operatorsFilter: onNext:value10
* com.ypz.rxjavademo I/operatorsFilter: onComplete
* 源碼如下:
* */
/**
* Filters items emitted by an ObservableSource by only emitting those that satisfy a specified predicate.
*
* @param predicate
* a function that evaluates each item emitted by the source ObservableSource, returning {@code true}
* if it passes the filter
* @return an Observable that emits only those items emitted by the source ObservableSource that the filter
* evaluates as {@code true}
* @see <a href="http://reactivex.io/documentation/operators/filter.html" target="_blank" rel="external nofollow" target="_blank" rel="external nofollow" >ReactiveX operators documentation: Filter</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> filter(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}
public final class ObservableFilter<T> extends AbstractObservableWithUpstream<T, T> {
final Predicate<? super T> predicate;
public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) {
super(source);
this.predicate = predicate;
}
@Override
public void subscribeActual(Observer<? super T> observer) {
source.subscribe(new FilterObserver<T>(observer, predicate));
}
static final class FilterObserver<T> extends BasicFuseableObserver<T, T> {
final Predicate<? super T> filter;
FilterObserver(Observer<? super T> actual, Predicate<? super T> filter) {
super(actual);
this.filter = filter;
}
@Override
public void onNext(T t) {
if (sourceMode == NONE) {
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return;
}
if (b) {
downstream.onNext(t);
}
} else {
downstream.onNext(null);
}
}
@Nullable
@Override
public T poll() throws Exception {
for (;;) {
T v = qd.poll();
if (v == null || filter.test(v)) {
return v;
}
}
}
}
}
public interface Predicate<T> {
/**
* Test the given input value and return a boolean.
* @param t the value
* @return the boolean result
* @throws Exception on error
*/
boolean test(@NonNull T t) throws Exception;
}
結合示例代碼以及源碼得出:filter底層是基于ObservableFilter的實作,通過對Predicate接口的實作,在onNext事件中反複複用Predicate進行過濾發送。
distinct
distinct:suppress duplicate items emitted by an Observable
過濾重複的元素保證發射的資料是唯一的
基本使用
其原理圖如下:
示例代碼以及源碼如下:
private fun operatorsDistinct() {
Observable.just(1L, 2L, 3L, 1L, 2L).distinct()
.subscribe(ShowMessageBaseObserver<Long>("operatorsDistinct-Easy用法"))
}
/**
* 運作結果:
* com.ypz.rxjavademo I/operatorsDistinct-Easy用法: onSubscribe
* com.ypz.rxjavademo I/operatorsDistinct-Easy用法: onNextValue:1
* com.ypz.rxjavademo I/operatorsDistinct-Easy用法: onNextValue:2
* com.ypz.rxjavademo I/operatorsDistinct-Easy用法: onNextValue:3
* com.ypz.rxjavademo I/operatorsDistinct-Easy用法: onComplete
* 相關源碼如下:
* */
/**
* Returns an Observable that emits all items emitted by the source ObservableSource that are distinct
* based on {@link Object#equals(Object)} comparison.
* Customizing the retention policy can happen only by providing a custom {@link java.util.Collection} implementation
* to the {@link #distinct(Function, Callable)} overload.
*
* @return an Observable that emits only those items emitted by the source ObservableSource that are distinct from
* each other
* @see #distinct(Function)
* @see #distinct(Function, Callable)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> distinct() {
return distinct(Functions.identity(), Functions.createHashSet());
}
public static <T> Callable<Set<T>> createHashSet() {
return (Callable)HashSetCallable.INSTANCE;
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <K> Observable<T> distinct(Function<? super T, K> keySelector, Callable<? extends Collection<? super K>> collectionSupplier) {
ObjectHelper.requireNonNull(keySelector, "keySelector is null");
ObjectHelper.requireNonNull(collectionSupplier, "collectionSupplier is null");
return RxJavaPlugins.onAssembly(new ObservableDistinct<T, K>(this, keySelector, collectionSupplier));
}
public final class ObservableDistinct<T, K> extends AbstractObservableWithUpstream<T, T> {
final Function<? super T, K> keySelector;
final Callable<? extends Collection<? super K>> collectionSupplier;
public ObservableDistinct(ObservableSource<T> source, Function<? super T, K> keySelector, Callable<? extends Collection<? super K>> collectionSupplier) {
super(source);
this.keySelector = keySelector;
this.collectionSupplier = collectionSupplier;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
Collection<? super K> collection;
try {
collection = ObjectHelper.requireNonNull(collectionSupplier.call(), "The collectionSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources.");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return;
}
source.subscribe(new DistinctObserver<T, K>(observer, keySelector, collection));
}
static final class DistinctObserver<T, K> extends BasicFuseableObserver<T, T> {
final Collection<? super K> collection;
final Function<? super T, K> keySelector;
DistinctObserver(Observer<? super T> actual, Function<? super T, K> keySelector, Collection<? super K> collection) {
super(actual);
this.keySelector = keySelector;
this.collection = collection;
}
@Override
public void onNext(T value) {
if (done) {
return;
}
if (sourceMode == NONE) {
K key;
boolean b;
try {
key = ObjectHelper.requireNonNull(keySelector.apply(value), "The keySelector returned a null key");
b = collection.add(key);
} catch (Throwable ex) {
fail(ex);
return;
}
if (b) {
downstream.onNext(value);
}
} else {
downstream.onNext(null);
}
}
}
}
結合示例代碼以及源碼得出:
首先在distinct()中,其實是使用到自定義Key一種封裝的實作,通過内部已經實作的Functions.createHashSet()去建立一個Hash,然後利用這一個HashSet對key進行比較是否資料重複了,然後去重發射資料,注意這種使用下value 等價于 key。
自定義key使用
原理圖如下:
示例代碼如下
private fun distinctKey() {
var times = 1L
Observable
.just(1L, 2L, 3L, 1L, 2L)
.distinct {
times += 1
times
}
.subscribe(ShowMessageBaseObserver<Long>("operatorsDistinct-Key用法"))
}
/**
* 運作結果如下:
* com.ypz.rxjavademo I/operatorsDistinct-Key用法: onSubscribe
* com.ypz.rxjavademo I/operatorsDistinct-Key用法: onNextValue:1
* com.ypz.rxjavademo I/operatorsDistinct-Key用法: onNextValue:2
* com.ypz.rxjavademo I/operatorsDistinct-Key用法: onNextValue:3
* com.ypz.rxjavademo I/operatorsDistinct-Key用法: onNextValue:1
* com.ypz.rxjavademo I/operatorsDistinct-Key用法: onNextValue:2
* com.ypz.rxjavademo I/operatorsDistinct-Key用法: onComplete
* */
結合示例代碼可以看到當使用Function去實作自定義Key的時候,因為key已經不在将onNext事件中Value看作為Key是以也就沒有達到類似簡單用法的去重效果,但是其去重是已經實作了。可見在distinct的去重效果是由key中value對比所得,實作這種自定義Key的用法時候必須使用統一的校準規則避免資料去重校檢出錯達不到預期效果。
debounce
debounce:suppress duplicate items emitted by an Observable
過濾發射資料過快的資料
原理圖如下:
示例代碼以及源碼如下:
private fun operatorsDebounce() {
Observable.create(ObservableOnSubscribe<Long> { emitter ->
if (emitter.isDisposed) return@ObservableOnSubscribe
for (index in 1L..7L) {
emitter.onNext(index)
Thread.sleep(index * 100L)
}
emitter.onComplete()
})
.debounce(500, TimeUnit.MILLISECONDS)
.subscribe(ShowMessageBaseObserver<Long>("operatorsDebounce"))
}
/**
* 運作結果如下:
* com.ypz.rxjavademo I/operatorsDebounce: onSubscribe
* com.ypz.rxjavademo I/operatorsDebounce: onNext:value6
* com.ypz.rxjavademo I/operatorsDebounce: onNext:value7
* com.ypz.rxjavademo I/operatorsDebounce: onComplete
* 源碼如下:
* */
/**
* Returns an Observable that mirrors the source ObservableSource, except that it drops items emitted by the
* source ObservableSource that are followed by newer items before a timeout value expires. The timer resets on
* each emission.
*
* @param timeout
* the length of the window of time that must pass after the emission of an item from the source
* ObservableSource in which that ObservableSource emits no items in order for the item to be emitted by the
* resulting ObservableSource
* @param unit
* the unit of time for the specified {@code timeout}
* @return an Observable that filters out items from the source ObservableSource that are too quickly followed by
* newer items
* @see <a href="http://reactivex.io/documentation/operators/debounce.html" target="_blank" rel="external nofollow" target="_blank" rel="external nofollow" >ReactiveX operators documentation: Debounce</a>
* @see #throttleWithTimeout(long, TimeUnit)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> debounce(long timeout, TimeUnit unit) {
return debounce(timeout, unit, Schedulers.computation());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> debounce(long timeout, TimeUnit unit, Scheduler scheduler) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableDebounceTimed<T>(this, timeout, unit, scheduler));
}
結合示例代碼以及源碼得出:
這種用法下其底層基于ObservableDebounceTimed的實作,其實作通過利用TimeUnit 進行時間控制,每發射一個資料後進入一個冷卻期間,冷卻期間不接收上遊的資料發射,進而過濾發射資料過快的事件。
take
take:emit only the first n items emitted by an Observable
count用法
原理圖如下:
示例代碼以及源碼如下:
private fun operatorsTakeCount() {
Observable.
rangeLong(1L, 5L).take(3).
subscribe(ShowMessageBaseObserver<Long>("operatorsTake_count"))
}
/**
* 運作結果如下:
* com.ypz.rxjavademo I/operatorsTake_count: onSubscribe
* com.ypz.rxjavademo I/operatorsTake_count: onNext:value1
* com.ypz.rxjavademo I/operatorsTake_count: onNext:value2
* com.ypz.rxjavademo I/operatorsTake_count: onNext:value3
* com.ypz.rxjavademo I/operatorsTake_count: onComplete
* 相關源碼如下:
* */
/**
* Returns an Observable that emits only the first {@code count} items emitted by the source ObservableSource. If the source emits fewer than
* {@code count} items then all of its items are emitted.
* @param count
* the maximum number of items to emit
* @return an Observable that emits only the first {@code count} items emitted by the source ObservableSource, or
* all of the items from the source ObservableSource if that ObservableSource emits fewer than {@code count} items
* @see <a href="http://reactivex.io/documentation/operators/take.html" target="_blank" rel="external nofollow" target="_blank" rel="external nofollow" target="_blank" rel="external nofollow" >ReactiveX operators documentation: Take</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> take(long count) {
if (count < 0) {
throw new IllegalArgumentException("count >= 0 required but it was " + count);
}
return RxJavaPlugins.onAssembly(new ObservableTake<T>(this, count));
}
public final class ObservableTake<T> extends AbstractObservableWithUpstream<T, T> {
final long limit;
public ObservableTake(ObservableSource<T> source, long limit) {
super(source);
this.limit = limit;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
source.subscribe(new TakeObserver<T>(observer, limit));
}
static final class TakeObserver<T> implements Observer<T>, Disposable {
}
}
結合示例代碼以及源碼得出:
count的單純是通過對count進行累加,注意是從1開始。當達到目标count值得時候中斷發射。
time用法
原理圖如下:
示例代碼以及源碼如下:
private fun operatorsTakeTime() =
Observable.
interval(1, TimeUnit.SECONDS).
take(3, TimeUnit.SECONDS).
subscribe(ShowMessageBaseObserver<Long>("operatorsTake_time"))
/**
* 運作結果如下:
* 第一次運作:
* com.ypz.rxjavademo I/operatorsTake_time: onSubscribe
* com.ypz.rxjavademo I/operatorsTake_time: onNext:value0
* com.ypz.rxjavademo I/operatorsTake_time: onNext:value1
* com.ypz.rxjavademo I/operatorsTake_time: onComplete
* 第二次運作:
* com.ypz.rxjavademo I/operatorsTake_time: onSubscribe
* com.ypz.rxjavademo I/operatorsTake_time: onNext:value0
* com.ypz.rxjavademo I/operatorsTake_time: onNext:value1
* com.ypz.rxjavademo I/operatorsTake_time: onNext:value2
* com.ypz.rxjavademo I/operatorsTake_time: onComplete
*
* */
/**
* Returns an Observable that emits those items emitted by source ObservableSource before a specified time runs
* out.
*
* @param time
* the length of the time window
* @param unit
* the time unit of {@code time}
* @return an Observable that emits those items emitted by the source ObservableSource before the time runs out
* @see <a href="http://reactivex.io/documentation/operators/take.html" target="_blank" rel="external nofollow" target="_blank" rel="external nofollow" target="_blank" rel="external nofollow" >ReactiveX operators documentation: Take</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> take(long time, TimeUnit unit) {
return takeUntil(timer(time, unit));
}
結合示例代碼以及源碼得出:
這一用法是基于timer以及takeUntil封裝實作所得,而takeUntil屬于條件與布爾操作符将在下一篇詳解。
這種用法的核心思維就是,擷取起始某個時間段内的資料,當超過時間片段時則不在擷取資料,将擷取到資料進行發送,當時間段已經超過的時間會自動中斷事件的發送。
從第一次和第二次運作結果不一緻更加驗證了這個道理,為什麼第一次發送的事件少了一次,因為程式并發運作的時間段并不是超精準的會存在微妙的誤差所緻。
takeLast
take:emit only the final n items emitted by an Observable
count的用法
原理圖如下:
示例代碼以及源碼如下:
private fun operatorsTakeLastCount() =
Observable.
rangeLong(1L, 5L).
takeLast(2).
subscribe(ShowMessageBaseObserver("operatorsTakeLast_count"))
/**
* 運作結果如下:
* com.ypz.rxjavademo I/operatorsTakeLast_count: onSubscribe
* com.ypz.rxjavademo I/operatorsTakeLast_count: onNext:value4
* com.ypz.rxjavademo I/operatorsTakeLast_count: onNext:value5
* com.ypz.rxjavademo I/operatorsTakeLast_count: onComplete
*
* */
/**
* Returns an Observable that emits at most the last {@code count} items emitted by the source ObservableSource. If the source emits fewer than
* {@code count} items then all of its items are emitted.
*
* @param count
* the maximum number of items to emit from the end of the sequence of items emitted by the source
* ObservableSource
* @return an Observable that emits at most the last {@code count} items emitted by the source ObservableSource
* @throws IndexOutOfBoundsException
* if {@code count} is less than zero
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> takeLast(int count) {
if (count < 0) {
throw new IndexOutOfBoundsException("count >= 0 required but it was " + count);
} else
if (count == 0) {
return RxJavaPlugins.onAssembly(new ObservableIgnoreElements<T>(this));
} else
if (count == 1) {
return RxJavaPlugins.onAssembly(new ObservableTakeLastOne<T>(this));
}
return RxJavaPlugins.onAssembly(new ObservableTakeLast<T>(this, count));
}
結合示例代碼以及源碼得出:
從源碼可以得到當count 的值大小傳回相應的結果,
當count :<0時,傳回運作異常
當count:0時,傳回ObservableIgnoreElements
當count :1時,傳回ObservableTakeLastOne
當count:>1時,傳回ObservableTakeLast
ObservableTakeLastOne與ObservableTakeLast差別在于對onNext以及一些事件實作方式有些差別,其本質都是截取最後指定位置的元素進行發射。
time的用法
原理圖如下:
示例代碼以及源碼如下:
private fun operatorsTakeLastTime() =
Observable.
intervalRange(0, 6, 0, 1, TimeUnit.SECONDS).
takeLast(3, TimeUnit.SECONDS).
subscribe(ShowMessageBaseObserver<Long>("operatorsTakeLast_time"))
/**
* 運作結果如下:
* com.ypz.rxjavademo I/operatorsTakeLast_time: onSubscribe
* com.ypz.rxjavademo I/operatorsTakeLast_time: onNext:value3
* com.ypz.rxjavademo I/operatorsTakeLast_time: onNext:value4
* com.ypz.rxjavademo I/operatorsTakeLast_time: onNext:value5
* com.ypz.rxjavademo I/operatorsTakeLast_time: onComplete
*
* */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.TRAMPOLINE)
public final Observable<T> takeLast(long time, TimeUnit unit) {
return takeLast(time, unit, Schedulers.trampoline(), false, bufferSize());
}
public final Observable<T> takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
if (count < 0) {
throw new IndexOutOfBoundsException("count >= 0 required but it was " + count);
}
return RxJavaPlugins.onAssembly(
new ObservableTakeLastTimed<T>(this, count, time, unit, scheduler, bufferSize, delayError));
}
結合示例代碼以及源碼得出:
time用法在于最後一個事件發送時,在發送前的一段時間段内所産生的事件會按照順序過濾出來并按照順序發射。
從源碼可得出其實time用法就是基于time and count的組合二次封裝,隻不過是count設定為1。
time and count用法
原理圖如下:
示例代碼以及源碼如下:
private fun operatorsTakeLastCountTime() =
Observable.
intervalRange(0, 6, 0, 1, TimeUnit.SECONDS).
takeLast(2, 3, TimeUnit.SECONDS).
subscribe(ShowMessageBaseObserver("operatorsTakeLast_count"))
/**
* 運作結果如下
* com.ypz.rxjavademo I/operatorsTakeLast_count: onSubscribe
* com.ypz.rxjavademo I/operatorsTakeLast_count: onNext:value4
* com.ypz.rxjavademo I/operatorsTakeLast_count: onNext:value5
* com.ypz.rxjavademo I/operatorsTakeLast_count: onComplete
* 相關源碼如下:
* */
public final Observable<T> takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
if (count < 0) {
throw new IndexOutOfBoundsException("count >= 0 required but it was " + count);
}
return RxJavaPlugins.onAssembly(
new ObservableTakeLastTimed<T>(this, count, time, unit, scheduler, bufferSize, delayError));
}
結合示例代碼以及源碼得出:
count大小可以看作是一個window的size。而time則為觀測收集資料的時間片段。
cout的時間片段等于time,則window可發射的size等于count的大小。
cout的時間片段小于time,則window可發射的size等于count的大小。
cout的時間片段大于time,則window可發射的size小于count的大小,并且會丢失資料。
skip
take:suppress the first n items emitted by an Observable
跳過指定條件内,第n位前産生的資料,采集n位後的資料
count用法
原理圖如下:
示例代碼以及源碼如下:
private fun operatorsSkip() {
Observable.rangeLong(1, 5).skip(3).subscribe(
ShowMessageBaseObserver<Long>("operatorsSkip-count")
)
}
/**
* 運作結果如下:
* com.ypz.rxjavademo I/operatorsSkip-count: onSubscribe
* com.ypz.rxjavademo I/operatorsSkip-count: onNext:value4
* com.ypz.rxjavademo I/operatorsSkip-count: onNext:value5
* com.ypz.rxjavademo I/operatorsSkip-count: onComplete
* 源碼如下:
* */
/**
* Returns an Observable that skips the first {@code count} items emitted by the source ObservableSource and emits
* the remainder.
*
* @param count
* the number of items to skip
* @return an Observable that is identical to the source ObservableSource except that it does not emit the first
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> skip(long count) {
if (count <= 0) {
return RxJavaPlugins.onAssembly(this);
}
return RxJavaPlugins.onAssembly(new ObservableSkip<T>(this, count));
public final class ObservableSkip<T> extends AbstractObservableWithUpstream<T, T> {
final long n;
public ObservableSkip(ObservableSource<T> source, long n) {
super(source);
this.n = n;
}
static final class SkipObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> downstream;
long remaining;
Disposable upstream;
SkipObserver(Observer<? super T> actual, long n) {
this.downstream = actual;
this.remaining = n;
}
@Override
public void onNext(T t) {
if (remaining != 0L) {
remaining--;
} else {
downstream.onNext(t);
}
}
}
}
結合示例代碼以及源碼得出:
count用法:通過ObservableSkip的實作所得,其中跳過的關鍵在于ObservableSkip中SkipObserver的onNext的實作。通過累減進而達到skip的作用,并按照事件順序發射資料。
time用法
原理圖如下:
示例代碼以及源碼如下:
private fun operatorsSkipTime() {
Observable.interval(1, TimeUnit.SECONDS).take(5).skip(3, TimeUnit.SECONDS)
.subscribe(ShowMessageBaseObserver<Long>("operatorsSkip_time"))
}
/**
* 運作結果如下:
* com.ypz.rxjavademo I/operatorsSkip_time: onSubscribe
* com.ypz.rxjavademo I/operatorsSkip_time: onNext:value2
* com.ypz.rxjavademo I/operatorsSkip_time: onNext:value3
* com.ypz.rxjavademo I/operatorsSkip_time: onNext:value4
* com.ypz.rxjavademo I/operatorsSkip_time: onComplete
* 源碼如下:
* */
/**
* Returns an Observable that skips values emitted by the source ObservableSource before a specified time window
* elapses.
*
* @param time
* the length of the time window to skip
* @param unit
* the time unit of {@code time}
* @return an Observable that skips values emitted by the source ObservableSource before the time window defined
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> skip(long time, TimeUnit unit) {
return skipUntil(timer(time, unit));
}
結合示例代碼以及源碼得出:
time用法底層是基于條件布爾操作符skipUntil的實作所得,其中通過内部封裝的timer實作,進而達到在time片段不在觀測資料,當time片段失效時将觀測所得的資料按照事件順序進行發射。
skipLast
take:emit only the final n items emitted by an Observable
count用法
原理圖如下:
示例代碼以及源碼如下:
private fun operatorsSkipLast() {
Observable.rangeLong(1, 5).skipLast(3)
.subscribe(ShowMessageBaseObserver<Long>("operatorsSkipLast-count"))
}
/**
* 運作結果如下:
* com.ypz.rxjavademo I/operatorsSkipLast-count: onSubscribe
* com.ypz.rxjavademo I/operatorsSkipLast-count: onNext:value1
* com.ypz.rxjavademo I/operatorsSkipLast-count: onNext:value2
* com.ypz.rxjavademo I/operatorsSkipLast-count: onComplete
* 源碼如下:
* */
/**
* Returns an Observable that drops a specified number of items from the end of the sequence emitted by the
* source ObservableSource.
*
* @param count
* number of items to drop from the end of the source sequence
* @return an Observable that emits the items emitted by the source ObservableSource except for the dropped ones
* at the end
* @throws IndexOutOfBoundsException
* if {@code count} is less than zero
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> skipLast(int count) {
if (count < 0) {
throw new IndexOutOfBoundsException("count >= 0 required but it was " + count);
}
if (count == 0) {
return RxJavaPlugins.onAssembly(this);
}
return RxJavaPlugins.onAssembly(new ObservableSkipLast<T>(this, count));
}
public final class ObservableSkipLast<T> extends AbstractObservableWithUpstream<T, T> {
final int skip;
public ObservableSkipLast(ObservableSource<T> source, int skip) {
super(source);
this.skip = skip;
}
@Override
public void subscribeActual(Observer<? super T> observer) {
source.subscribe(new SkipLastObserver<T>(observer, skip));
}
static final class SkipLastObserver<T> extends ArrayDeque<T> implements Observer<T>, Disposable {
private static final long serialVersionUID = -3807491841935125653L;
final Observer<? super T> downstream;
final int skip;
Disposable upstream;
SkipLastObserver(Observer<? super T> actual, int skip) {
super(skip);
this.downstream = actual;
this.skip = skip;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
downstream.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (skip == size()) {
downstream.onNext(poll());
}
offer(t);
}
}
}
結合示例代碼以及源碼得出:
count用法:
當count<0時,會産生IndexOutOfBoundsException異常。
當count=1時,會直接傳回原事件流。
當count>1時,count==size()時候,會觸發downstream.onNext(poll())也就是直接事件流直接回調用onComplete()事件。其次通過使用offer将事件看成一個集合,然後利用**java.util.offer(T t)**這一方法實作跳過指定最後一位至最後一位前n位的事件發射
time用法
原理圖如下:
示例代碼以及源碼如下:
private fun operatorsSkipLastTime() {
Observable.interval(1, TimeUnit.SECONDS).take(6)
.skipLast(4, TimeUnit.SECONDS).subscribe(
ShowMessageBaseObserver<Long>("operatorsSkipLast-time"))
}
/**
* 運作結果如下:
* com.ypz.rxjavademo I/operatorsSkipLast-time: onSubscribe
* com.ypz.rxjavademo I/operatorsSkipLast-time: onNext:value0
* com.ypz.rxjavademo I/operatorsSkipLast-time: onNext:value1
* com.ypz.rxjavademo I/operatorsSkipLast-time: onComplete
* 相關源碼如下:
* */
/**
* Returns an Observable that drops items emitted by the source ObservableSource during a specified time window
* (defined on a specified scheduler) before the source completes.
*
* @param time
* the length of the time window
* @param unit
* the time unit of {@code time}
* @param scheduler
* the scheduler used as the time source
* @param delayError
* if true, an exception signalled by the current Observable is delayed until the regular elements are consumed
* by the downstream; if false, an exception is immediately signalled and all regular elements dropped
* @param bufferSize
* the hint about how many elements to expect to be skipped
* @return an Observable that drops those items emitted by the source ObservableSource in a time window before the
* source completes defined by {@code time} and {@code scheduler}
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> skipLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
// the internal buffer holds pairs of (timestamp, value) so double the default buffer size
int s = bufferSize << 1;
return RxJavaPlugins.onAssembly(new ObservableSkipLastTimed<T>(this, time, unit, scheduler, s, delayError));
}
結合示例代碼以及源碼得出:
其實skipLast time用法就是将發射事件onComple調用前那段時間處于一個冷卻期間的時間片段不在收集資料,未觸發冷卻期間的時間片段一直收集資料并按照順序發射出去。
總結
elementAt兩種用法:elementAt(long index, T defaultItem) 與 elementAt(long index)分别繼承不同的Single、Maybe的差別以及實作方式
frist基于elementAt(long index, T defaultItem) 實作,專注于擷取第一項資料
last基于ObservableLastSingle實作方式,專注于擷取最後一項資料
IngoreElements基于ObservableIgnoreElementsCompletable實作,專注于事件是否發送完成以及是否觸發錯誤
filter基于ObservableFilter以及Predicate的實作,達到自定義過濾事件的條件器作用
distinct去重的作用有兩種實作方式,如果簡單用法不适用建議使用自定義Key的用法進而達到資料去重的結果
debounce作用于過濾資料發射過塊的作用基于ObservableDebounceTimed的實作
take兩種使用方式count和time;每一種方式實作有差別一種适應于沒有時間段的觀測一種适用于時間段的觀測
takeLast三種使用方式count和time、count與time組合;每一種方式實作有差別一種适應于沒有時間段的觀測一種适用于時間段的觀測剩下一種适用于時間段内觀察資料資料大小有關
skip兩種使用方式count和time;每一種方式實作有差別一種适應于沒有時間段的觀測一種适用于時間段的觀測
skipLast兩種使用方式count和time;每一種方式實作有差別一種适應于沒有時間段的觀測一種适用于時間段的觀測
需要區分出那種場景使用哪一種過濾方式,以及每一種實作底層類為什麼。作用是什麼。
點我代碼傳送歡迎star