RxJava 在 GitHub 首頁上的自我介紹是 “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程式的庫)。
特點:簡潔
RJava能辦得到的事情,用handler+Thread或者AsyncTask都可以做到,而使用Rxjava能使異步操作代碼更簡潔。随着程式邏輯變得越來越複雜,它依然能夠保持簡潔。
因為使用一般的異步嵌套,會形成複雜的難易維護的代碼。而RxJava是一條從上到下的鍊式調用,無論多複雜得邏輯都可以用一條線表達,沒有嵌套。
友情提醒:看的過程中一定不要眨眼,看清楚這幾個字眼,很容易亂,不要搞錯。
Observable被觀察者
Observer觀察者
Subscriber觀察者
subscribe訂閱
OnSubscribe被觀察者内部成員
observeOn線程排程方法
subscribeOn線程排程方法
下面開始RxJava的學習:
首先是引入依賴:
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
RxJava是基于擴充的觀察者模式實作的。
用我們最熟悉的按鈕+監聽器來重制觀察者模式:
①Button —>按鈕,也稱被觀察者
②OnclickListener —>監聽者,也稱觀察者
③setOnClickListener() —>設定監聽者,也稱訂閱
④onClick() —>點選事件,也稱事件
說白了也就是:Button對象通過setOnClickListener(listener)持有OnClickListener對象引用,并且調用OnClickListener對象的回調方法onClick()。那也就是被觀察者調用觀察者的回調方法,實作了被觀察者向觀察者的事件傳遞,這就是觀察者模式。
在RxJava中,也有四個概念:
①Observable —>被觀察者
②Observer/Subscriber —>觀察者
③subscribe() —>訂閱
④onNext()/onError()/onCompleted()—>事件
-
與傳統觀察者模式不同, RxJava 的事件回調方法除了普通事件 onNext() (相當于 onClick() /
onEvent())之外,還定義了兩個特殊的事件:onCompleted() 和 onError()。
-
onCompleted(): 事件隊列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。RxJava
規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為标志。
- onError(): 事件隊列異常。在事件處理過程中出異常時,onError() 會被觸發,同時隊列自動終止,不允許再有事件發出。
在一個正确運作的事件序列中, onCompleted() 和 onError() 有且隻有一個,并且是事件序列中的最後一個。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在隊列中調用了其中一個,就不應該再調用另一個。
下面開始來講如何實作:
一.觀察者Observer/Subscriber的建立
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
Observer是接口,Subscriber是實作了Observer接口的一個抽象類,兩者的使用方式一模一樣,而Subscriber做了一些擴充,而實質上,Observer是會轉成一個Subscriber後再使用的。
Subscriber與Observer的差別(Subscriber多出兩個方法):
①onStart():事件還未發送之前被調用,如果對準備工作的線程有要求, onStart() 就不适用了,因為它總是在 subscribe 所發生的線程被調用,而不能指定線程。要在指定的線程來做準備工作,可以使用 doOnSubscribe() 方法。
②unsubscribe():用于取消訂閱,調用後,Subscriber 将不再接收事件。一般在這個方法調用前,可以使用 isUnsubscribed() 先判斷一下狀态。
二:被觀察者Observable的建立
①create()
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
可以看到這裡建立Observable的時候,傳入了一個OnSubscriber對象,它被生成的Observable所持有,當Observable被訂閱subscribe(subscriber)的時候,會調用OnSubscriber.call(subscriber)方法。也就是說訂閱的時候,subscribe(subscriber)傳入的subscriber對象參數,會從方法OnSubscriber.call(subscriber)傳進去執行。那也就是被觀察者調用觀察者的回調方法,實作了被觀察者向觀察者的事件傳遞,這就是觀察者模式。
②just(T…):将傳入的參數依次發出來。
③from(T[]) 或者from(Iterable):将傳入的數組或者Iterable拆分成具體對象,依次發出。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
可以看到just,from方法比較簡潔,其實這兩個方法都是基于create()方法的,也就是說,最終還是調用了create()生成的Observable對象。
這三個方法生成的Observable是完完全全等價的,當Observable對象被訂閱subscribe()的時候,将會依次調用:
onNext("Hello");
onNext("Hi");
onNext("Aloha");
onCompleted();
其實還有很多其他的生成Obervable的方法,這裡先不列舉。
三:subscribe()訂閱
訂閱代碼非常簡單
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
我們可以看看subscribe(subscriber)的核心代碼:
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
①subscriber首先會執行onStart()方法,這個在前面有提及到。
②執行OnSubscriber.call(subscriber)。
我們在create()的時候,就知道call調用的是這樣的代碼:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
也就是subscriber會在Observable裡面被回調onNext()事件。
③傳回Subscription ,為了友善unsubscribe()。
其實你會發現,真正的被觀察者不是Observable,而是Observable.OnSubscribe對象。而OnSubscribe是Observable的一個成員對象。
subscribe() 還支援不完整定義的回調:
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自動建立 Subscriber ,并使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動建立 Subscriber ,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動建立 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
Action0 是 RxJava 的一個接口,它隻有一個方法 call(),這個方法是無參無傳回值的;
Action1是 RxJava 的一個接口,它隻有一個方法 call(T param),這個方法也無傳回值,但有一個參數;
由于 onCompleted() 方法是無參無傳回值的,是以 可以用Action0 可以包裝。
由于 onNext(T obj) 和 onError(Throwable error) 也是單參數無傳回值的,是以可以用Action1包裝。
這裡給一個真是的例子
String[] names = ...;
Observable.from(names)
.subscribe(new Action1<String>() {
@Override
public void call(String name) {
Log.d(tag, name);
}
});
非常簡單,會依次列印出這個names數組。
四:Scheduler線程排程器
在 RxJava 的預設規則中,事件的發出和消費都是在同一個線程的。而觀察者模式本身的目的就是『背景處理,前台回調』的異步機制。
RxJava内置的Scheduler:
①Scheduler.immediate() :目前線程,相當于不指定線程。(預設)
②Scheduler.newThread():啟動新線程。
③Scheduler.io():I/O操作所使用的scheduler。内部實作用了一個無上限的線程池,會重用空閑線程,比newThread()效率高。
④Scheduler.computation():CPU密集計算使用的scheduler。
⑤AndroidSchedulers.mainThread():Android主線程scheduler。
使用subscriberOn()和observeOn對線程控制:
1.subscribeOn()
指定 subscribe() 所發生的線程,即 Observable.OnSubscribe 被激活時所處的線程(因為發生subscribe()訂閱的時候,會調用Observable.OnSubscribe的call方法)。或者叫做事件産生的線程。
2.observeOn()
指定 Subscriber 所運作的線程。或者叫做事件消費的線程。并且observeOn() 指定的是它之後的操作所在的線程。
不同于 observeOn() , subscribeOn() 的位置放在哪裡都可以,但它是隻能調用一次的。如果多次調用 subscribeOn() ,隻有第一次起作用,後面的都無效。observeOn()可以多次調用。
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
.subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
RxJava多次切換線程栗子:
Observable.just(, , , ) // IO 線程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新線程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 線程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主線程,由 observeOn() 指定
至于為什麼可以多次切換線程,在後面講解了lift()變換原理之後,再做解釋。
這樣就實作了:加載圖檔将會發生在 IO 線程,而設定圖檔則被設定在了主線程。
五:變換
将事件序列中的對象或整個序列進行加工處理,轉換成不同的事件或事件序列。
①map(一對一的轉換)
Observable.just("images/logo.png") // 輸入類型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 參數類型 String
return getBitmapFromPath(filePath); // 傳回類型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 參數類型 Bitmap
showBitmap(bitmap);
}
});
Func1 和 Action1 非常相似,是 RxJava 的一個接口,用于包裝含有一個參數的方法。 Func1 和 Action 的差別在于, Func1 包裝的是有傳回值的方法。
map() 方法将參數中的 String 對象轉換成一個 Bitmap 對象後傳回,而在經過 map() 方法後,事件的參數類型也由 String 轉為了 Bitmap。
相當于:
Observable.just(“images/logo.png”)
轉換成
Observable.just(bitmap)
②flatMap(一對多的轉換)
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
flatMap() 的原理是這樣的:
1. 使用傳入的事件對象建立一個 Observable 對象;
2. 并不發送這個 Observable, 而是将它激活,于是它開始發送事件;
3. 每一個建立出來的 Observable 發送的事件,都被彙入同一個 Observable ,而這個 Observable 負責将這些事件統一交給 Subscriber 的回調方法。
對于上面的例子,目的是把每一個學生的課程列印出來(一個學生有多個課程)。
可以看到flatMap方法使用傳入Student事件對象,建立并傳回Observable對象。多次執行flatMap後會傳回很多個Observable,最後每一個Observable都會被彙入到同一個新的Observable中,而這個 新的Observable 負責将這些事件統一交給 Subscriber 的回調方法;
③
④
⑤
…剩下的很多常用的操作符,以後再做補充…
傳統嵌套網絡請求,用rxjava可以鍊式表達:rxjava+retrofit
networkClient.token() // 傳回 Observable<String>,在訂閱時請求 token,并在響應後發送 token
.flatMap(new Func1<String, Observable<Messages>>() {
@Override
public Observable<Messages> call(String token) {
// 傳回 Observable<Messages>,在訂閱時請求消息清單,并在響應後發送請求到的消息清單
return networkClient.messages();
}
})
.subscribe(new Action1<Messages>() {
@Override
public void call(Messages messages) {
// 處理顯示消息清單
showMessages(messages);
}
});
六:變換原理 lift()
這些變換雖然功能各有不同,但實質上都是針對事件序列的處理和再發送。類型轉換都是通過lift()這個方法來實作的。
首先看lift()的核心源碼
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);//傳進來的subscriber會被進行一輪包裝(變換)成newSubscriber
newSubscriber.onStart();
onSubscribe.call(newSubscriber);//這個onSubscribe指的是原始Observable内的那個onSubscribe,不屬于目前正在建立的這個Observable。
}
});
}
可以看到lift()方法,傳入一個Operator參數,傳回一個Observable對象;這個傳回的Observable對象,是通過create生成的。
原始有一個Observable和一個OnSubscriber,經過lift()以後,又産生了一個新的Observable和一個新的OnSubscriber。那麼新的Observable被訂閱的時候,就會觸發新的OnSubscriber的call()方法,也就是上面代碼那一個call方法。在call方法裡面傳入的subscriber會被變換包裝成新的subscriber,緊接着先調用新的subscriber的onStart()方法,接着調用原始onSubscribe的call方法,在裡面正式調用新subscriber的回調方法。也就是說,新的Observable僅僅起一個代理的功能,最終的事件分發,還是在原始Observable中。(隻不過,在開發中,我們完全可以了解一個Observable被lift()後依舊是一個簡單的僅僅事件類型變換了的Observable)
舉一個栗子:
将事件中的 Integer 對象轉換成 String
observable.lift(new Observable.Operator<String, Integer>() {
@Override
public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
// 将事件序列中的 Integer 對象轉換為 String 對象
return new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
subscriber.onNext("" + integer);
}
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
};
}
});
以上代碼中Operator的call方法的調用,是在lift()調用産生新的Observable被訂閱的時候,觸發新的OnSubscriber.call(subscriber)方法裡,對subscriber進行變換和包裝的方法。
這一段代碼的原意是要把Observable<Integer>轉換成Observable<String>,表面上我們看到的,也以為是Observable<Integer>被轉換成了Observable<String>,以為裡面的資料類型從Integer變成了String。
真相是:
原始Observable<Integer>沒變,然後生成了新的Observable<String>并傳回,我們手中持有的是新的這個Observable<String>,而這個新的Observable<String>能夠接收String類型的subscriber<String>的訂閱,而訂閱觸發的新的OnSubscriber調用call方法,在裡面對subscriber<String>進行轉換包裝成subscriber<Integer>,接着用原始的OnSubscriber調用call,把這個包裝後的subscriber<Integer>進行事件分發(調用onNext()..onCompleted()),因為原始Observable<Integer>是Integer類型,它能夠處理這個類型的subscriber<Integer>。那麼在我們眼中,就好像是把事件參數類型Integer轉變成了String,而事實是subscriber的類型做了變換。這些新的Observable其實都起了一個代理的作用,subscriber僅僅是經過了他們,最後都會被傳到最原始的observable裡面,被它執行(回調)。這就給了我們一個假象。
七:線程多次切換原理
RaJava是可以多次切換線程的,為什麼?
因為observeOn()方法指定的是Subscriber觀察者的線程,然而這個Subscriber并不是subscribe(Subscriber)訂閱傳入的這個Subscriber,而是observeOn執行時那個Observable所對應的Subscriber。(根據lift()變換原理,我們知道,rxjava鍊式每次變換操作後,Observable對象會變化,那麼它持有Subscriber對象也會變化(由下級包裝上傳過來的),是經過轉換包裝的,這個Subscriber也就是通過目前Observable對象的onSubscriber對象調用call(Subscriber)方法傳入的Subscriber對象)
observeOn() 指定的是它之後的操作所在的線程。是以如果有多次切換線程的需求,隻要在每個想要切換線程的位置調用一次 observeOn() 即可。