1.定義
RxJava
是一個 基于事件流、實作異步操作的庫
2.作用
用于實作異步操作,類似于
Android
中的
AsyncTask
、
Handler+
new Thread的作用
3. 特點
由于
RxJava
的使用方式是:基于事件流的鍊式調用,是以使得
RxJava
:
- 邏輯簡潔
- 實作優雅
- 使用簡單
更重要的是,随着程式邏輯的複雜性提高,它依然能夠保持簡潔 & 優雅
-
原理 基于 一種擴充的觀察者模式Rxjava
-
的擴充觀察者模式中有4個角色:Rxjava
角色 | 作用 | 類比 |
---|---|---|
被觀察者(Observable) | 産生事件 | 顧客 |
觀察者(Observer) | 接收事件,并給出響應動作 | 廚房 |
訂閱(Subscribe) | 連接配接 被觀察者 & 觀察者 | 服務員 |
事件(Event) | 被觀察者 & 觀察者 溝通的載體 | 菜式 |
4.使用
導包
implementation 'io.reactivex.rxjava2:rxjava:2.1.3'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'com.jakewharton.rxbinding2:rxbinding:2.2.0'
RxJava 執行個體1.定義2.作用3. 特點4.使用5.優雅實作其他方法observeOn:觀察者的執行線程subscribeOn:被觀察者的執行線程RXView 防抖

1.定義被觀察者
三種方式:Observable.create、Observable.just、Observable.fromArray
public void buyFood(){
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("potato");
e.onNext("tomato");
e.onNext("noodles");
e.onComplete();
}
});
Observable<String> observable1 = Observable.just("potato","tomato","noodles");
String [] foods = new String[]{"potato","tomato","noodles"};
Observable<String> observable2 = Observable.fromArray(foods);
}
2.定義觀察者
使用observer或者subscriber
Observer<String> mObserver = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
};
Subscriber<String> mStringSubscriber = new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
};
特别注意:2種方法的差別,即Subscriber 抽象類與Observer 接口的差別
// 相同點:二者基本使用方式完全一緻(實質上,在RxJava的 subscribe 過程中,Observer總是會先被轉換成Subscriber再使用)
// 不同點:Subscriber抽象類對 Observer 接口進行了擴充,新增了兩個方法:
// 1\. onStart():在還未響應事件前調用,用于做一些初始化工作
// 2\. unsubscribe():用于取消訂閱。在該方法被調用後,觀察者将不再接收 & 響應事件
// 調用該方法前,先使用 isUnsubscribed() 判斷狀态,确定被觀察者Observable是否還持有觀察者Subscriber的引用,如果引用不能及時釋放,就會出現記憶體洩露
3.訂閱
observable.subscribe(observer);
5.優雅實作
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("potato");
e.onNext("tomato");
e.onNext("noodles");
e.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
其他方法
observeOn:觀察者的執行線程
observeOn 作用于該操作符之後直到出現新的observeOn操作符
subscribeOn:被觀察者的執行線程
subscribeOn 作用于該操作符之前的 Observable 的建立操符作以及 doOnSubscribe 操作符 ,換句話說就是 doOnSubscribe 以及 Observable 的建立操作符總是被其之後最近的 subscribeOn 控制
執行個體
//将字元串轉換成double類型
String path = "12.3";
public void test(){
//輸入事件
Observable.just(path)
//對事件進行處理
.map(new Function<String, Double>() {
@Override
public Double apply(@NonNull String s) throws Exception {
return Double.parseDouble(s);
}
})
//指定被觀察者執行線程
.subscribeOn(Schedulers.io())
//指定觀察者執行線程
.observeOn(AndroidSchedulers.mainThread())
//訂閱觀察者
.subscribe(
new Observer<Double>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Double aDouble) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
Rxjava中的
Scheduler
相當于線程控制器,Rxjava通過它來指定每一段代碼應該運作在什麼樣的線程。Rxjava提供了5種排程器:
- .io()
- .computation()
- .immediate()
- .newThread()
- .trampoline()
另外,Android還有一個專用的
AndroidSchedulers.mainThread()
指定操作将在Android主線程運作。Rxjava通過
subscribeOn()
和
observeOn()
兩個方法來對線程進行控制,
subscribeOn()
指定
subscribe()
時間發生的線程,也就是事件産生的線程,
observeOn()
指定
Subscriber
做運作的線程,也就是消費事件的線程。
Schedulers.io() 用于I/O操作,比如:讀寫檔案,資料庫,網絡互動等等。行為模式和
newThread()
差不多,重點需要注意的是線程池是無限制的,大量的I/O排程操作将建立許多個線程并占用記憶體
Schedulers.computation()計算工作預設的排程器,這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。
Schedulers.immediate() 這個排程器允許你立即在目前線程執行你指定的工作。這是預設的
Scheduler
。
Schedulers.newThread() 它為指定任務啟動一個新的線程。
Schedulers.trampoline()
當我們想在目前線程執行一個任務時,并不是立即,我們可以用
trampoline()
将它入隊。這個排程器将會處理它的隊列并且按序運作隊列中每一個任務。
RXView 防抖
private void test3() {
RxView.clicks(控件).throttleFirst(1000, TimeUnit.MILLISECONDS).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object o) {
Log.e(TAG, "onNext: 響應事件....." );
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
RxJava篇(應用場景) - 簡書 (jianshu.com)