目錄
RxJava是什麼及RxJava優勢
RxJava原理
RxJava使用(建立、裝配、消費)
1、建立
Create
Just
From
Interval & IntervalRange
Range & RangeLong
defer
timer
Empty & Never & Error
2、裝配階段
轉換操作符--lift()
轉換操作符二
轉換操作符三
轉換操作符四
轉換操作符五
線程排程
篩選操作符
組合Observable操作符
錯誤處理
異步操作符
公用操作符
數學運算符
聚合操作符
公用操作符三—Do系列
Subject
Processor
關于背壓
RxJava的一些擴充
總結
RxJava是什麼及RxJava優勢
RxJava 并不是一們新語言,RxJava 是一個ReactiveX 在JVM平台上的實作:是一個通過使用可觀察序列來編寫異步和基于事件的程式的庫。
RxJava 最大的好處就是:在異步進行中,它能讓線程排程變得非常簡單。
其次就是鍊式調用,它能讓一個業務流更清晰,可讀性更強。
最後我們把鍊式調用和線程排程合并起來用,那個編碼展現簡直不要太爽。
舉個例子--找美女的影評
1、異步拉取某電影的評論
2、過濾掉不是女性的評論
3、按使用者等級從高到低排序
4、将美女的頭像和昵稱在清單裡顯示出來
private void loadData(String url) {
Observable.just(url)
.observeOn(Schedulers.io())
.map(new Function<String, Response>() {
@Override
public Response apply(String mS) throws Exception {
System.out.println(Thread.currentThread().getName() + "---" + mS);
Response response = null;
URL url = new URL(mS);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
conn.setReadTimeout(5000);
if (conn.getResponseCode() == 200) {
InputStream in = conn.getInputStream();
byte[] b = new byte[1024 * 512];
ByteArrayOutputStream baos = new ByteArrayOutputStream();
int len = 0;
while ((len = in.read(b)) > -1) {
baos.write(b, 0, len);
}
String msg = baos.toString();
response = new Gson().fromJson(msg, Response.class);
}
System.out.println(Thread.currentThread().getName() + "-response--" + response);
return response;
}
})
.flatMap(new Function<Response, ObservableSource<User>>() {
@Override
public ObservableSource<User> apply(Response mResponse) throws Exception {
return Observable.fromIterable(mResponse.getCmts());
}
})
.filter(new Predicate<User>() {
@Override
public boolean test(User mUser) throws Exception {
return mUser.gender == 2;
}
})
.toSortedList(new Comparator<User>() {
@Override
public int compare(User o1, User o2) {
return o2.getUserLevel() - o1.getUserLevel();
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new SingleObserver<List<User>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(List<User> mUsers) {
System.out.println(Thread.currentThread().getName() + "---" + mUsers);
mUserAdapter.setNewData(mUsers);
}
@Override
public void onError(Throwable e) {
}
});
}
舉個例子--釋出一個圖文貼子
1、文字檢查(假設也是耗時的)
2、先裁剪壓縮圖檔,
3、向雲服務申請上傳授權
4、上傳到雲服務,拿到雲位址
5、雲位址加文本一起走釋出接口

1、2、3并行執行,2和3都完成後執行4,1和4都完成後執行5
如果按照我們平常的套路去寫這裡的邏輯你會發現你要做很多的判斷,并且調用鍊也比較複雜。看一下RxJava怎麼去實作。
public static void main(String... arg) {
upload("localPath", "發一個簡單的貼子");
try {
Thread.sleep(15 * 1000);
} catch (InterruptedException mE) {
mE.printStackTrace();
}
}
private static void upload(String localPath, String content) {
Observable.zip(checkContent(content), uploadImage(localPath), new BiFunction<String, String, String>() {
@Override
public String apply(String mS, String mS2) throws Exception {
System.out.println("現在開始分布--" + Thread.currentThread().getName());
Thread.sleep(2000);
return "釋出成功";
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String mS) {
System.out.println(mS + "--" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
System.out.println("-Throwable-" + e.getMessage() + "--" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
}
});
}
private static Observable<String> uploadImage(String localPath) {
return Observable.zip(Observable.just(localPath).observeOn(Schedulers.io()).map(new Function<String, byte[]>() {
@Override
public byte[] apply(String path) throws Exception {
//裁剪壓縮後傳回byte
System.out.println("裁剪壓縮圖檔--" + Thread.currentThread().getName());
Thread.sleep(3000);
System.out.println("裁剪壓縮圖檔成功--" + Thread.currentThread().getName());
return new byte[0];
}
}), Observable.just("token").observeOn(Schedulers.io()).map(new Function<String, String>() {
@Override
public String apply(String mS) throws Exception {
System.out.println("拿雲端位址--" + Thread.currentThread().getName());
Thread.sleep(1000);
System.out.println("拿雲端位址成功--" + Thread.currentThread().getName());
return "http://www.baidu.com";
}
}), new BiFunction<byte[], String, String>() {
@Override
public String apply(byte[] mBytes, String mS) throws Exception {
System.out.println("壓縮成功,拿雲端位址都成功,現在上傳--" + Thread.currentThread().getName());
Thread.sleep(3000);
System.out.println("現在上傳成功,傳回雲端位址--" + Thread.currentThread().getName());
return mS;
}
});
}
private static Observable<String> checkContent(String content) {
return Observable.just(content).flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String mS) throws Exception {
System.out.println("文案檢查開始--" + Thread.currentThread().getName());
Thread.sleep(1000);
if (null == mS || mS.length() <= 0) {
System.out.println("文案檢查失敗--" + Thread.currentThread().getName());
throw new NullPointerException("content can not be null !");
}
System.out.println("文案檢查成功--" + Thread.currentThread().getName());
return Observable.just(mS);
}
});
}
RxJava原理
三個基本的元素
RxJava是基于觀察者模式的,首先我們認識一下RxJava 三個基本的元素
被觀察者(Observable)
觀察者(Observer)
訂閱(subscribe)
三者關系:
Observable. Subscribe(Observer)
三個基本事件
onNext() 發送該事件時,觀察者會回調 onNext() 方法
onError() 發送該事件時,觀察者會回調 onError() 方法,當發送該事件之後,其他事件将不會繼續發送
onComplete() 發送該事件時,觀察者會回調 onComplete() 方法,當發送該事件之後,其他事件将不會繼續發送
另外
onSubscribe(Disposable d); 在注冊時傳回一個Disposable可以用于反注冊
Observable5個基類簡述
Observable 0-n的資料流,不支援背壓
Flowable 0-n的資料流 支援響應流和背壓
Single 一個錯誤或者一個item流
Completable 隻有錯誤信号和完成信号
Maybe 沒有item,恰好一個item或者一個錯誤
有了這幾個基礎之後我們開始嘗試自己寫一個小例子。
public static void create() {
//被觀察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
//我是一個發射器,用來發射事件給觀察者,
// onComplete和onError有且隻能有一個,都會結束,後面的事件都觀察不到
emitter.onNext("1");
emitter.onNext("2");
emitter.onComplete();
if (emitter.isDisposed()) {
} else {
emitter.onError(new Throwable("err"));
}
}
});
//觀察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(String mS) {
System.out.println("onNext-" + mS);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
//綁定
observable.subscribe(observer);
}
我們一步步看一下源碼
1、建立。做了一些判斷,最後為傳回一個ObservableCreate執行個體
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
2、建立一個觀察者,就是實作Observer<T>接口
3、綁定。ObservableCreate.subscribe(Observer),最終走ObservableCreate.subscribeActual(Observer);
//訂閱過程
public final void subscribe(Observer<? super T> observer) {
//做了一堆空判斷,勾子的處理等,最後會走這個方法
subscribeActual(observer);
}
//我們看一下ObservableCreate的subscribeActual方法
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
//create的入參ObservableOnSubscribe接口,用于回調發射器
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//執行個體化一個發射器,發射器持有觀察者
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
//把發射器通過create傳進來的接口回調過去
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
}
4、發射事件
//發射事件 :發射器next一個事件的時候會調用觀察者的next()方法,觀察者就可以接收到事件
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
經過上邊的4步我們已經了解了RxJava是如何執行個體觀察者模式的。
還有一個最最重要的還沒有提到,那就是線程排程的問題。我們看到訂閱的時候都會被一個
這個我們在講完轉換之後展開。
RxJava使用(建立、裝配、消費)
1、建立
前面我們用Observable的create()方法建立了一個被觀察者,要傳入一個ObservableOnSubscribe接口在訂閱之後拿到一個發射器ObservableEmitter再去手動發射資料。
Create
create():建立一個自定義的Obserable,需要自己實作onNext()、onError(),onComplete()的邏輯;
RxJava提供了很多個更簡單的建立操作符。
Just
适用于:Flowable,Observable,Maybe,Single
作用:構造一個響應類型通過拿一個預先存在的對象且在訂閱時将該對象發射給下遊消費者。
它還有許多個重載的版本,差別在于接受的參數的個數不同,最少1個,最多10個
public static void just() {
Observable<String> just = Observable.just("1111", "2222", "3333");
just.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String mS) {
System.out.println(mS);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
From
fromIterable
适用于: Flowable,Observable
作用:周遊集合,逐個發事件
fromArray
适用于: Flowable,Observable
作用:周遊數組,逐個發事件
fromCallable
适用于:Flowable,Observable,Maybe,Single,Completable
作用:傳入一個Callable,在訂閱時執行call方法,把結果通過DeferredScalarDisposable(訂閱時觀察者的onSubscribe方法會回 調該執行個體)的complet()方法回傳
fromAction
适用于: Maybe,Completable
作用:傳入一個Action,在訂閱時執行run方法。
fromRunnable
适用于: Maybe,Completable
作用:傳入一個Runnable,在訂閱時執行run方法。
fromFuture
适用于: Flowable,Observable,Maybe,Single,Completable
作用:接收一個Future對象,會同步等待Future傳回的結果再發送資料,也可以設定等待逾時時間
Interval & IntervalRange
适用于: Flowable,Observable
作用:周期性生成一個無限的、永遠增長的數(長整型). IntervalRange 變體生成一個有限數量的數。
Range & RangeLong
适用于: Flowable,Observable
作用:發送一定範圍的事件序列。 Range()函數 生成 Integer ,rangeLong() 生成 Long.
defer
适用于: Flowable,Observable,Maybe,Single,Completable
作用:直到有觀察者訂閱時才建立Observable,并且為每個觀察者建立一個新的Observable。
defer操作符會一直等待直到有觀察者訂閱它,然後它使用Observable工廠方法生成一個Observable。
timer
适用于: Flowable,Observable,Maybe,Single,Completable
作用:建立一個在給定的時間段之後傳回一個特殊值的Observable,它在延遲一段給定的時間後發射一個簡單的數字0
Empty & Never & Error
适用于: Flowable,Observable,Maybe,Completable
empty:這種類型原在訂閱時立馬發出完成的信号。
never:這種類型原不發出onNext,onSuccess,onError或者onComplete信号。 這種類型的響應源 在測試或者在組合操作符中禁用确切的源非常有用
error:對消費者發出一個錯誤信号,要麼已有,要麼通過一個java.util.concurrent.Callable 生成。
2、裝配階段
裝配階段主要用到之前提到的5個基類的操作符,由于操作符較多,我根據文檔給大家重點選了幾類。
- 轉換操作符
- 組合Observable操作符
- 異步和阻塞操作符
- 公用操作符
- 數學和集合操作符
轉換操作符--lift()
lift()
Lift方法是RxJava中所有操作符的基礎,可以通過它做各種各樣的變化。弄清楚它的原理,也友善我們了解其他操作符
了解起來就兩個字----代理。
//由觀察String轉換到觀察Student
public static void lift(final String name) {
Observable<String> nameObservable = Observable.just(name);
Observable<Student> observable = nameObservable.lift(new ObservableOperator<Student, String>() {
@Override
public Observer<String> apply(final Observer<? super Student> observer) throws Exception {
Observer<String> nameObserver = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(String mS) {
observer.onNext(new Student(name, 18));
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
};
return nameObserver;
}
});
}
1、建立一個觀察源為String類型的Observable<String>
2、實作ObservableOperator接口
public interface ObservableOperator<Downstream, Upstream> {
//傳入一個觀察者Downstream,這個觀察者是我們訂閱的觀察者,對應的類型是Student
//傳回一個觀察者Upstream,這個觀察者要我們自己生成,對應的類型是源始的類型String,Upstream持有Downstream
//Upstream走Next的時候的拿到一個類型是String的事件,我們用這個String生成對應的Student事件再調用Downstream的Next方法
@NonNull
Observer<? super Upstream> apply(@NonNull Observer<? super Downstream> observer) throws Exception;
}
3、lift()生成一個新的ObservableLift<Student, String>,它持有Observable<String>和ObservableOperator
4、ObservableLift<Student, String>.subscribe(Observer<Student> downstream)
5、ObservableLift<Student, String>.subscribeActual(Observer<Student> downstream)
Observer<String> upstream= ObservableOperator.apply(downstream)
source.subscribe(upstream)
6、upstream.next(String)-->downstream.next(new Student(String))
本來被觀察者Observable<String>應該被downstream觀察的,但是他們兩的類型不相同,是以中間實作了另外一個觀察者upstream,他和觀察源的類型保持了一緻。在中間層的觀察者upstream拿到事件後,做了一層轉換,最後回調給downstream。
明白了轉換之後我們就大概猜到線程切換是怎麼做的了。
轉換操作符二
Map操作符對原始Observable發射的每一項資料應用一個你選擇的函數,然後傳回一個發射這些結果的Observable
Cast操作符将原始Observable發射的每一項資料都強制轉換為一個指定的類型(多态),然後再發射資料,它是map的一個特殊版本:
适用于一對一轉換,當然也可以配合flatmap進行适用
private static void map() {
Observable.just("小明", "小紅").map(new Function<String, Student>() {
@Override
public Student apply(String mS) throws Exception {
return new Student(mS, 18);
}
}).subscribe(new Consumer<Student>() {
@Override
public void accept(Student mStudent) throws Exception {
System.out.println(mStudent);
}
});
}
轉換操作符三
flatMap:将一個發送事件的上遊Observable變換為多個發送事件的Observables,然後将它們發射的事件合并後放進一個單獨的Observable裡。需要注意的是, flatMap并不保證事件的順序。
concatMap:concatMap和flatMap一樣,差別在于能保證順序
flatMapIterable:将上流的任意一個元素轉換成一個Iterable對象。
多用于一對多,多對多。
/**
* 無序
*/
private static void test() {
Observable.range(1,10).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
}
轉換操作符四
buffer
用于将整個流按數量進行分組
private static void buffer() {
Observable.range(1, 10).buffer(3).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> mIntegers) throws Exception {
System.out.println(mIntegers);
}
});
}
結果:
[1, 2, 3]
[4, 5, 6]
[7, 8, 9]
[10]
groupBy
将整個流按條件分組
private static void groupBy() {
Observable.range(1, 5).groupBy(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer mInteger) throws Exception {
return mInteger % 2;
}
}).subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
@Override
public void accept(GroupedObservable<Integer, Integer> mIntegerIntegerGroupedObservable) throws Exception {
if (mIntegerIntegerGroupedObservable.getKey() == 0) {
mIntegerIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer mInteger) {
System.out.println("雙-" + mInteger);
}
});
} else {
mIntegerIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer mInteger) {
System.out.println("單-" + mInteger);
}
});
}
}
});
}
結果:
單-1
雙-2
單-3
雙-4
單-5
Scan
對原始Observable發射的每一項資料(第一項除外)都應用一個函數,計算出函數的結果z值,并将該值填 充回可觀測序列,等待和下一次發射的資料一起使用,比如做累加,或者找最大最小值
private static void scan() {
Observable.range(1, 5).scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer mInteger, Integer mInteger2) throws Exception {
System.out.println("mInteger--" + mInteger + "--mInteger2--" + mInteger2);
return (mInteger > mInteger2 ? mInteger : mInteger2);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer mInteger) throws Exception {
System.out.println(mInteger);
}
});
}
結果:
1
mInteger--1--mInteger2--2
2
mInteger--2--mInteger2--3
3
mInteger--3--mInteger2--4
4
mInteger--4--mInteger2--5
5
window
window與buffer差別:window是把資料分割成了Observable,buffer是把資料分割成List
private static void window() {
Observable.range(1, 10).window(3).subscribe(new Consumer<Observable<Integer>>() {
@Override
public void accept(Observable<Integer> mIntegerObservable) throws Exception {
System.out.println(mIntegerObservable);
mIntegerObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer mInteger) throws Exception {
System.out.println(mInteger);
}
});
}
});
}
結果:
[email protected]
1
2
3
[email protected]
4
5
6
[email protected]
7
8
9
[email protected]
10
(Observer<R> old)
轉換操作符五
As
可以将一種類型的Observable轉換成任意類型
public interface ObservableConverter<T, R> {
@NonNull
R apply(@NonNull Observable<T> upstream);
}
Observable.just("18").as(new ObservableConverter<String, Observable<Student>>() {
@Override
public Observable<Student> apply(Observable<String> upstream) {
return Observable.just(new Student("小明", 18));
}
}).subscribe(new Consumer<Student>() {
@Override
public void accept(Student mStudent) throws Exception {
}
});
Compose
可以通過它将一種類型的Observable轉換成另一種類型的Observable
public interface ObservableTransformer<Upstream, Downstream> {
@NonNull
ObservableSource<Downstream> apply(@NonNull Observable<Upstream> upstream);
}
Observable.just("18").compose(new ObservableTransformer<String, Student>() {
@Override
public ObservableSource<Student> apply(Observable<String> upstream) {
return Observable.just(new Student("小明", 18));
}
}).subscribe(new Consumer<Student>() {
@Override
public void accept(Student mStudent) throws Exception {
}
});
那麼as,compose與map flatMap比較?
不同點在于compose()操作符擁有更高層次的抽象概念:它操作于整個資料流中,不僅僅是某一個被發送的事件。具體如下:
compose()是一個能夠從資料流中得到原始Observable<T>的操作符,是以,那些需要對整個資料流産生作用的操作(比如,subscribeOn()和observeOn())需要使用compose()來實作。相較而言,如果在flatMap()中使用subscribeOn()或者observeOn(),那麼它僅僅對在flatMap()中建立的Observable起作用,而不會對剩下的流産生影響。
當建立Observable流的時候,compose()會立即執行,猶如已經提前寫好了一個操作符一樣,而flatMap()則是在onNext()被調用後執行,onNext()的每一次調用都會觸發flatMap(),也就是說,flatMap()轉換每一個事件,而compose()轉換的是整個資料流。
因為每一次調用onNext()後,都建立一個Observable,是以flatMap()的效率較低。事實上,compose()操作符隻在主幹資料流上執行操作。
線程排程
subscribeOn
指定被觀察者生産事件的線程,多次調用隻有第一次生效
observeOn
指定觀察響應事件的線程,多次切換都生效
RxJava内置了多個高度器
1、Schedulers.immediate() 目前線程 = 不指定線程 預設
2、AndroidSchedulers.mainThread() Android主線程 操作UI
3、Schedulers.newThread() 正常新線程 耗時等操作
4、Schedulers.io() io操作線程 網絡請求、讀寫檔案等io密集型操作
5、Schedulers.computation() CPU計算操作線程 大量計算操作
Observable.just(new Student("小明", 18))
.observeOn(Schedulers.io())
.subscribeOn(Schedulers.io())
.map(new Function<Student, String>() {
@Override
public String apply(Student mStudent) throws Exception {
System.out.println(Thread.currentThread().getName());
return mStudent.name;
}
})
.observeOn(Schedulers.newThread())
.map(new Function<String, Integer>() {
@Override
public Integer apply(String mS) throws Exception {
System.out.println(Thread.currentThread().getName());
return 100;
}
})
.observeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.newThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer mInteger) throws Exception {
System.out.println(Thread.currentThread().getName());
}
});
結果:
RxCachedThreadScheduler-2
RxNewThreadScheduler-2
RxComputationThreadPool-2
原理:
前面我們看了lift()操作符的原理,我們簡單看一下ObserveOn的源碼。
1、ObserveOn會傳回一個ObservableObserveOn的新的被觀察者
2、訂閱時還是和lift()一樣的套路,在subscribeActual裡生成了一個中間層的觀察者ObserveOnObserver。
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
}
篩選操作符
filter
用來根據指定的規則對源進行過濾
private static void fitter() {
Observable.range(1, 30).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer mInteger) throws Exception {
return mInteger % 10 == 0;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer mInteger) throws Exception {
System.out.println(mInteger);
}
});
}
結果:
10
20
30
elementAt & firstElement & lastElement
用來擷取指定位置的資料
private static void element() {
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 50; i++) {
list.add(i);
}
Observable.fromIterable(list).elementAt(18).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer mInteger) throws Exception {
System.out.println(mInteger);
}
});
}
結果:
18
distinct & distinctUntilChanged
前進用來對源中的重複資料進行過濾,後者隻當相鄰的兩個元素相同的時候才會将它們過濾掉
Observable.range(1, 10).map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer mInteger) throws Exception {
return mInteger % 5;
}
}).distinct().subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer mInteger) throws Exception {
System.out.println(mInteger);
}
});
結果:
1
2
3
4
0
//重複的條件
Observable.just(new Student("小明", 19), new Student("小明", 10), new Student("小紅", 18), new Student("小明", 10)).distinctUntilChanged(new Function<Student, String>() {
@Override
public String apply(Student mStudent) throws Exception {
return mStudent.name;
}
}).subscribe(new Consumer<Student>() {
@Override
public void accept(Student mStudent) throws Exception {
System.out.println(mStudent);
}
});
結果
Student{name='小明', age=19}
Student{name='小紅', age=18}
Student{name='小明', age=10}
skip & skipLast & skipUntil & skipWhile
過濾掉資料的前n項,或者一段時間等
Observable.range(1, 10).skip(9).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer mInteger) throws Exception {
System.out.println(mInteger);
}
});
結果:
10
take & takeLast & takeUntil & takeWhile
按照某種規則進行選擇操作
Observable.range(1, 10).takeWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer mInteger) throws Exception {
return mInteger < 3;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer mInteger) throws Exception {
System.out.println(mInteger);
}
});
結果:
1
2
ignoreElements
過濾所有源Observable産生的結果,隻會把Observable的onComplete和onError事件通知給訂閱者
sample
定期掃描源Observable産生的結果,在指定的間隔周期内進行采樣
Observable.interval(100,TimeUnit.MILLISECONDS).sample(300,TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(Long mLong) throws Exception {
System.out.println(mLong);
}
});
結果:
1
5
7
throttleFirst & throttleLast & throttleLatest & throttleWithTimeout
指定的事件範圍内發射出來的第一個/最後一個資料
debounce
用來限制發射頻率的,它僅在過了一段指定的時間還沒發射資料時才發射一個資料
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
emitter.onNext(1); // skip
Thread.sleep(400);
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(100);
emitter.onNext(4); // deliver
Thread.sleep(605);
emitter.onNext(5); // deliver
Thread.sleep(510);
emitter.onComplete();
}
}).debounce(500, TimeUnit.MILLISECONDS).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer mLong) throws Exception {
System.out.println(mLong);
}
});
結果:
2
4
5
組合Observable操作符
´startwith指定的資料源的之前插入幾個資料
´merge&mergeArray可以讓多個資料源的資料合并起來交叉發射
´concat & concatArray & concatEager将多個Observable拼接起來按照傳入的Observable的順序進行發射,一個Observable沒有發射完畢之前不會發射另一個Observable裡面的資料
´zip & zipArray & zipIterable将多個Observable資料合并起來它隻發射與發射資料項最少的那個Observable一樣多的資料
´combineLatest會存儲每個Observable的最近的值的,任意一個Observable發射新值時都會觸發操作
´join類似于combineLatest操作符,但是join操作符可以控制每個Observable産生結果的生命周期,在每個結果的生命周期内,可以與另一個Observable産生的結果按照一定的規則進行合并
錯誤處理
onErrorReturn遇到錯誤時,發送1個特殊事件然後正常終止
onErrorResumeNext遇到錯誤時,發送1個新的Observable
onExceptionResumeNext遇到異常時,發送1個新的Observable
retry當出現錯誤時,讓被觀察者(Observable)重新發射資料
retryUntil
retrywhen遇到錯誤時,将發生的錯誤傳遞給一個新的被觀察者(Observable),并決定是否需要重新訂閱原始被觀察者(Observable)& 發送事件
異步操作符
´fromAction( )— 将一個Action轉換成一個調用action且當有Subscriver訂閱時發射它的結果的Observable
´fromCallable( )— 将一個Callable轉換成調用callable且發射它的結果或者異常的Observable。
´fromRunnable( )— 将一個Runnable轉換成Observable,這個Observable執行runable且發射它的結果當有訂閱者訂閱的時候
´runAsync( ) —傳回一個 StoppableObservable ,這個StoppableObservable 發射多個 action 當被一個特定的scheduler的action生成的時候。
公用操作符
´ObserveOn指定Observable在哪個排程器上發送通知給觀察者
´SubscribeOn指定Observable自身在哪個排程器上執行
´Timeout如果原始Observable過了指定的一段時長沒有發射任何資料,Timeout操作符會以一個onError通知終止這個Observable,或者繼續一個備用的Observable。
´Timestamp它将一個發射T類型資料的Observable轉換為一個發射類型為Timestamped的資料的Observable,每一項都包含資料的發射時間。
´delay( )— 從一個Obseravble按一定數量向将來轉移一些發射對象
´delaySubscription( )— 持有一個Subscriber的訂閱請求一段指定的時間在傳入它到源Observable之前。
´materialize( )— 把一個Observable轉換一系列通知。
´dematerialize( )—是Materialize的逆向過程
´timestamp( )— 為每一個被Observable發出的item附上時間戳
´serialize( )— 強制一個Observable執行序列化調用
´cache( )— 記住被Observable發射的item序列且為未來的訂閱者發射相同的序列。
´timeInterval( )— 在源Observable的兩個連續的發射之間暫停一段時間
´using( )— 建立一個與Observable相同生命周期的可支配的資源
´single( )— 如果Observable再發射一個item後完成了,那麼傳回這個item,否則抛出一個異常
´singleOrDefault( ) —如果Observable發射一個item後完成了,那麼傳回這個item,否則傳回預設item
´repeat( )— 建立一個重複發射特殊item或者item序列的 Observable
´repeatWhen( ) —建立一個重複發射特殊item或者item序列的 Observable,取決于第二個Observable的發射
數學運算符
´averageDouble
´averageFloat
´max
´min
´sumDouble
´sumFloat
´sumInt
´sumLong
聚合操作符
´Reduce和Scan類似Scan每次都輸出結果,Reduce輸出最終結果
´reduceWith傳回一個對象,然後發射資料時候都帶上這個對象
可以做收集資料,比如對象是List或者map。最後傳回這個對象
´count 用來統計源 Observable 完成的時候一共發射了多少個資料。
´Collect 和reduceWith類似,隻是第二個參數BiConsumer沒有傳回值
´collectInto和reduceWith類似第一個入參不用callback去生成
´toList 對發射的資料加到一個List裡最後傳回
´toSortedList對發射的資料進行排序後傳回
´toMap對發射資料用一個方法傳回某個資料的key最後傳回map
´toMultimap和toMap類似,差別是value是Collection
公用操作符三—Do系列
注冊一個Action,在某種條件下回調,可以在事件流的各個階段插入一個回調
´doOnEach( )—Observable發射每個item
´doOnNext( )— 就在Observable傳入onNext事件順流而下之前
´doAfterNext( ) —就在Observable傳入onNext事件順流而下之後
´doOnSubscribe( )—Observabler訂閱Observable
´doOnUnsubscribe( ) —Observable取消訂閱Observable
´doOnCompleted( )— 當Observable成功完成
´doOnError( )— 當Observable帶錯誤完成
´doOnTerminate( )— 在Observable結束之前,不管成功或是出錯
´finallyDo( )—當Observable 終止之後會被調用,無論是正常還是異常終止
´doAfterTerminate( )—Observable結束之後,不管成功或是出錯
Subject
´Subject在ReactiveX是作為observer和observerable的一個bridge或者proxy。因為它是一個觀察者,是以它可以訂閱一個或多個可觀察對象,同時因為他是一個可觀測對象,是以它可以傳遞和釋放它觀測到的資料對象,并且能釋放新的對象。
´ReplaySubject 可觀察到的所有資料(包括訂閱前和訂閱後的,訂閱前的數量可控)
´BehaviorSubject 訂閱前最後一個資料和訂閱後接收到的所有資料,相當于 ReplaySubject.createWithSize(1)
´PublishSubject可觀察訂閱後接收到的資料)
´AsyncSubject 僅可觀察接收到的最後一個資料
´SerializedSubject在并發情況下使用SerializedSubject,并發時隻允許一個線程調用onnext等方法
´UnicastSubject 僅支援訂閱一次的Subject
Processor
´Processor 和 Subject 的作用相同的,既是觀察者,也是被觀察者。Subject 不支援背壓,是 RxJava 1.x 繼承過來的,Processor 繼承 FlowableProcessor,支援背壓。
´AsyncProcessor不論何時訂閱,都隻發射最後一個資料,如果因為異常而終止,不會釋放任何資料,但是會向 Observer 傳遞一個異常通知。
´BehaviorProcessor發射訂閱之前的一個資料和訂閱之後的全部資料。如果訂閱之前沒有值,可以使用預設值。
´PublishProcessor從哪裡訂閱就從哪裡發射資料。
´ReplayProcessor無論何時訂閱,都發射所有的資料。
´SerializedProcessor其它 Processor 不要在多線程上發射資料,如果确實要在多線程上使用,用這個 Processor 封裝,可以保證在一個時刻隻在一個線程上執行。
´UnicastProcessor隻能有一個觀察者。
關于背壓
´什麼是背壓?
´被觀察對象發射item比操作符或者訂閱者消費更快的情況
´例如:想象一下用zip操作符去合并兩個無限的Observable,其中一個發射item的頻次是另一個的兩倍。一個zip操作符的實作必須維持一個不斷擴大的緩沖池來緩存被更快的Observable發射的item ,最終合并被慢的Observable發射的item。這将導緻RxJava占有大量的系統資源。
背壓的五種政策
´MISSING, onNext事件是在不進行任何緩沖或删除的情況下寫入的。相當于沒有指定背壓政策
´ERROR, 如果下遊無法跟上,導緻如果緩存池滿了,則會發出反向壓力異常信号。
´BUFFER,緩沖所有onNext的值,直到下遊消耗它。(可能為OOM)
´DROP, 如果下遊無法跟上,導緻如果緩存池滿了,丢棄将要放入緩存池中的資料。
´LATEST 與Drop政策一樣,如果緩存池滿了,丢棄将要放入緩存池中的資料,不同的是,不管緩存池的狀态如何,LATEST都會将最後一條資料強行放入緩存池中,來保證觀察者在接收到完成通知之前,能夠接收到Flowable最新發射的一條資料。
解決背壓
´增加緩沖區大小
´使用标準運算符批處理/跳過值
´由消費者控制流速
´flowable為運算符托管128個元素的預設緩沖區大小,可通過bufferSize()進行通路,這些元素可通過系統參數rx2.buffer-size全局覆寫。
´當下流消費Buffer的prefetch - (prefetch >> 2)+的時候才會重新緩存新資料(預設128的75%是96個,)
´下流控制:
1、onSubscribe 的參數類型不再是 Disposable,而是 Subscription,可以調用它的 cancel() 切斷觀察者與被觀察者之間的聯系。Subscription 還有一個 request(long n) 方法,用來向生産者申請可以消費的事件數量。這樣便可以根據本身的消費能力進行消費事件。
2、當調用了 request() 方法後,生産者便發送對應數量的事件供消費者消費。即生産者要求多少,消費者就發多少。
3、如果不顯式調用 request 就表示消費能力為 0。request 這個方法若不調用,下遊的 onNext 與 OnComplete 都不會調用。
demo裡有個針對背壓例子,具體請檢視下Example_4_Flowable這個類
RxJava的一些擴充
´Retrofit+RxJava+Okhttp
´RxBinding
´RxPermission
´RxBus
´RxLifecycle
總結
下面是一些比較常用的,會用這些基本上就夠用了
- 建立:just,from,Interval
- 線程排程:observerOn,subscribeOn
- 轉換:map,flatmap,compose
- 組合,zip,concat,merge等
- 公用:delay,doOn系列等
-
錯誤處理:onErrorReturn,
onErrorResumeNext,
onExceptionResumeNext,
retry,
retryUntil,
retrywhen,
- Subject
最後
基本提到的操作符都有例子:https://github.com/YangKee/RxJavaDemo