不講 rxjava 和 retrofit 而是直接上手 2 了,因為 2 封裝的更好用的更多。
1. 觀察者模式
常見的 button 點選事件為例,button 是被觀察者,listener 是觀察者,setOnClickListener 過程是訂閱,有了訂閱關系後在 button 被點選的時候,監聽者 listener 就可以響應事件。
這裡的
button.setOnClickListener(listener)
看上去意思是被觀察者訂閱了觀察者(雜志訂閱了讀者),邏輯上不符合日常生活習慣。其實這是設計模式的習慣,不必糾結,習慣了這種模式就利于了解觀察者模式了。
2. RxJava 中的觀察者模式
-
:被觀察者(ble 結尾的單詞一般表示 可…的,可觀察的)Observable
-
:觀察者(er 結尾的單詞一般表示 …者,…人)Observer
-
:訂閱subscribe
首先建立 Observable 和 Observer,然後
observable.subscribe(observer)
,這樣 Observable 發出的事件就會被 Observer 響應。一般我們不手動建立 Observable,而是由 Retrofit 傳回給我們,我們拿到 Observable 之後隻需關心如何操作 Observer 中的資料即可。
不過為了由淺入深的示範,還是手動建立 Observable 來講解。
2.1 建立 Observable
常見的幾種方式,不常用的不寫了,因為我覺得這個子產品不是重點。
-
var observable=Observable.create(ObservableOnSubscribe<String> {...})
-
var observable=Observable.just(...)
-
var observable = Observable.fromIterable(mutableListOf(...))
2.1.1 create()
var observable2=Observable.create(object :ObservableOnSubscribe<String>{
override fun subscribe(emitter: ObservableEmitter<String>) {
emitter.onNext("Hello ")
emitter.onNext("RxJava ")
emitter.onNext("GoodBye ")
emitter.onComplete() }
})
ObservableOnSubscribe
和
ObservableEmitter
都是陌生人,這個要是詳細講涉及到源碼分析,東西可就多了(主要是我不熟悉),是以可以了解成 ObservableOnSubscribe 是用來幫助建立 Observable 的,ObservableEmitter 是用來發出事件的(這些事件在觀察者 Observer 中可以響應處理)。
emitter 一次發射了三個事件,然後調用了 onComplete() 這些在下面講觀察者 Observer 時還會提到,一并講解。
2.1.2 just
這句的效果等同于上面用 create 建立 observable,即 調用 3 次 onNext 後再調 onComplete。
2.1.3 fromIterable
這句的效果等同于上面用 create 建立 observable,即 調用 3 次 onNext 後再調 onComplete。
2.2 建立 Observer
val observer = object : Observer<String> {
override fun onComplete() {
Log.e("abc", "-----onComplete-----")
}
override fun onSubscribe(d: Disposable) {
Log.e("abc", "-----onSubscribe-----")
}
override fun onNext(t: String) {
Log.e("abc", "-----onNext-----$t")
}
override fun onError(e: Throwable) {
Log.e("abc", "-----onError-----$e")
}
}
//訂閱
observable.subscribe(observer)
log 列印情況:
-----onSubscribe-----
-----onNext-----Hello
-----onNext-----RxJava
-----onNext-----GoodBye
-----onComplete-----
可以看到,先是建立訂閱關系,然後根據前面 observable 的發射順序來列印 onNext,參數通過 onNext(t: String) 傳進來,最後調用 onComplete,多說一句,在 just 和 fromIterable 的情況下,沒有手動調用 Emitter,但是仍會先調用 onNext,最後調用 onComplete
2.3 Consumer 和 Action
這兩個詞意思分别是消費者(可以了解為消費被觀察者發射出來的事件)和行為(可以了解為響應被觀察者的行為)。對于 Observer 中的 4 個回調方法,我們未必都能用得到,如果隻需要用到其中的一部分,就需要 Consumer 和 Action 上場了。
有參數的
onSubscribe
、
onNext
、
onError
我們用 Consumer 來代替,無參的
onComplete
用 Action 代替:
2.3.1 subscribe(Consumer<? super T> onNext)
observable.subscribe(object :Consumer<String>{
override fun accept(t: String?) {
Log.e("abc", "-----onNext-----$t")
}
})
//列印
-----onNext-----Hello
-----onNext-----RxJava
-----onNext-----GoodBye
說明一下,如果 subscribe 中我們隻傳一個對象參數,那隻能是
subscribe(Consumer<? super T> onNext)
(onNext 方法),不能是 Action 或 Consumer<? super Throwable> onError、Consumer<? super Disposable> onSubscribe
注意:Consumer 中的回調方法名稱是 accept,差別于前面的 onNext
2.3.2 subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
帶有兩個 Consumer 參數,分别負責 onNext 和 onError 的回調。
observable.subscribe(object : Consumer<String> {
override fun accept(t: String?) {
Log.e("abc", "-----onNext-----$t")
}
}, object : Consumer<Throwable> {
override fun accept(t: Throwable?) {
Log.e("abc", "-----onError-----$e")
}
})
如果想要一個帶有兩個 Consumer 但是不是這種搭配(比如
subscribe(Consumer<? super T> onNext, Consumer<? super Disposable> onSubscribe)
),可以嗎?答案是:不行
2.3.3 subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)
帶有三個參數,分别負責onNext、onError和onComplete的回調。
observable.subscribe(object : Consumer<String> {
override fun accept(t: String?) {
Log.e("abc", "-----onNext-----$t")
}
}, object : Consumer<Throwable> {
override fun accept(t: Throwable?) {
Log.e("abc", "-----onError-----$e")
}
}, object : Action {
override fun run() {
Log.e("abc", "-----onComplete-----")
}
})
同樣,三個參數隻能有這一種搭配
注意:Action 中的回調方法名稱是 run,差別于前面的 onComplete
2.3.4 subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe)
這種情況和直接 new 出來的 Observer 效果一樣。
observable2.subscribe(object : Consumer<String> {
override fun accept(t: String?) {
Log.e("abc", "-----onNext-----$t")
}
}, object : Consumer<Throwable> {
override fun accept(t: Throwable?) {
Log.e("abc", "-----onError-----$e")
}
}, object : Action {
override fun run() {
Log.e("abc", "-----onComplete-----")
}
},object : Consumer<Disposable>{
override fun accept(t: Disposable?) {
Log.e("abc", "-----onSubscribe-----") }
})
3. 變換
在上面的例子中,Observable 發送的都是 String 類型的資料,是以在 Observer 中接收的也都是 String,現實開發中的資料多種多樣,而且有時候 Observable 提供的資料不是我們理想的情況,這種情況下就需要用到轉換操作符。
同樣我們隻講常用的:
3.1 map
比如我們想把上遊的 Int 類型的資料轉換成 String 可以這樣操作:
Observable.fromIterable(mutableListOf<Int>(1, 3, 5, 7, 8))
.map(object : Function<Int, String> {
override fun apply(t: Int): String {
return "zb$t"
}
})
.subscribe(object : Consumer<String> {
override fun accept(t: String?) {
Log.e("abc","-- $t --")
}
})
//Log日志
-- zb1 --
-- zb3 --
-- zb5 --
-- zb7 --
-- zb8 --
通過
map
操作符,Int 類型資料,到 Consumer 裡已經成了 String(這裡為了簡單的隻看資料就沒用 Observer 而改用 Consumer,兩者都可以)。這裡面用到了
Function
,它的第一個泛型是 Observable 中發射的資料類型,第二個泛型是我們想要裝換之後的資料類型,在 Function 的 apply 方法中手動完成資料的轉化。
示意圖:map 把圓的變成了方的。

3.2 flatMap
與 map 相似,不過 flatMap 傳回的是一個 Observable,也就是說 Function 的第二個泛型固定了,就是 Observable,這樣說不太好了解,看個例子:
假如現在有多個學生,每個學生有多個科目,每個科目考了多次試,現在要列印所有的分數。單單隻用 map 就不能直接搞定,試試吧
class Course(var name: String, var scores: MutableList<Int>)
class Student(var name: String, var courses: MutableList<Course>)
var stu1Course1 = Course("體育",mutableListOf(80, 81, 82))
var stu1Course2 = Course("美術",mutableListOf(63, 62, 60))
var stu1 = Student("StuA", mutableListOf(stu1Course1, stu1Course2))
var stu2Course1 = Course("音樂",mutableListOf(74, 77, 79))
var stu2Course2 = Course("希臘語",mutableListOf(90, 90, 91))
var stu2 = Student("StuB", mutableListOf(stu2Course1, stu2Course2))
Observable.just(stu1,stu2)
.map(object :Function<Student,MutableList<Course>>{
override fun apply(t: Student): MutableList<Course> {
return t.courses
}
})
.subscribe(object :Consumer<MutableList<Course>>{
override fun accept(t: MutableList<Course>?) {
for (item in t!!){
for (i in item.scores){
Log.e("abc","--->$i")
}
}
}
})
通過兩層 for 循環可以列印,這也是沒辦法的事,因為在 map 裡面隻能拿到 Course 集合。使用 flatMap 的情況是這樣的:
Observable.just(stu1, stu2)
.flatMap(object : Function<Student, ObservableSource<Course>> {
override fun apply(t: Student): ObservableSource<Course> {
return Observable.fromIterable(t.courses)
}
})
.flatMap(object : Function<Course, ObservableSource<Int>> {
override fun apply(t: Course): ObservableSource<Int> {
return Observable.fromIterable(t.scores)
}
})
.subscribe(object : Consumer<Int> {
override fun accept(t: Int?) {
Log.e("abc", "---> $t")
}
})
// log 列印
---> 80
---> 81
---> 82
---> 63
---> 62
---> 60
---> 74
---> 77
---> 79
---> 90
---> 90
---> 91
用了兩次 flatMap,鍊式調用比縮進式更清晰。這裡面的 flatMap 傳回值類型的是 ObservableSource 并不是我們在前面提到的 Observable,檢視 Observable 源碼可以看到,它繼承了 ObservableSource,是以這種多态用法是可以的。
另外在 apply 中傳回的
Observable.fromIterable(t.courses)
這一句不就是我們建立 Observable 的方式嗎?
簡單的說,map 是把 Observable 發射的資料變換一下類型,flatMap 是把資料中集合/數組中的每個元素再次通過 Observable 發射。
示意圖:faltMap 把一系列圓的通過一系列的 Observable 變成了一系列方的。
圖雖然畫的醜,但是我想意思比較明白了。
3.3 filter
filter
是過濾的意思,通過判斷是否符合我們想要的邏輯,來決定是否發射事件,隻有傳回 true 的事件才被發射,其他的被抛棄。還以上面的例子為例,假如我們隻想看 80 分以上的成績可以這樣過濾:
Observable.just(stu1, stu2)
.flatMap(object : Function<Student, ObservableSource<Course>> {
override fun apply(t: Student): ObservableSource<Course> {
return Observable.fromIterable(t.courses)
}
})
.flatMap(object : Function<Course, ObservableSource<Int>> {
override fun apply(t: Course): ObservableSource<Int> {
return Observable.fromIterable(t.scores)
}
})
.filter(object :Predicate<Int>{
override fun test(t: Int): Boolean {
return t > 80
}
})
.subscribe(object : Consumer<Int> {
override fun accept(t: Int?) {
Log.e("abc", "---> $t")
}
})
// log 列印
---> 81
---> 82
---> 90
---> 90
---> 91
注意,filter 裡面不是用 Function 了,而是 Predicate,這個單詞是“基于…”的意思,基于 t > 80,也就是選擇大于 80 分的成績。
4. 結合 Retrofit 使用
前面 3 小節講了很多,都是為了講清楚 RxJava 的整個工作流程,還沒涉及到線程切換。現實開發中更多的時候 Observable 是通過 Retrofit 傳回給我們的。Retrofit 是一個網絡請求架構,它基于 OkHttp3,做了更好的封裝,結合 RxJava 用慣了的話可以大大提到開發效率。還是一樣,我們隻看怎麼用,不涉及源碼解讀。
4.1 Retrofit 進行簡單的 Get 請求
implementation 'com.squareup.retrofit2:retrofit:2.6.2'
implementation 'com.squareup.retrofit2:converter-gson:2.6.2'
先引入依賴,然後我們請求一個知乎日報的新聞資料(點選檢視資料:https://news-at.zhihu.com/api/4/news/latest):
// ZhEntity
class ZhEntity {
var date: String? = null
var stories: MutableList<StoriesBean>? = null
var top_stories: MutableList<TopStoriesBean>? = null
class StoriesBean {
var image_hue: String? = null
var title: String? = null
var url: String? = null
var hint: String? = null
var ga_prefix: String? = null
var type: Int = 0
var id: Int = 0
var images: MutableList<String>? = null
}
class TopStoriesBean {
var image_hue: String? = null
var hint: String? = null
var url: String? = null
var image: String? = null
var title: String? = null
var ga_prefix: String? = null
var type: Int = 0
var id: Int = 0
}
}
// ApiService
import retrofit2.Call
import retrofit2.http.GET
import retrofit2.http.Url
interface ApiService {
@GET("news/latest")
fun getLatestNews(): Call<ZhEntity>
}
// 調用
val retrofit = Retrofit.Builder()
.addConverterFactory(GsonConverterFactory.create())
.baseUrl("https://news-at.zhihu.com/api/4/")
.build()
val service: ApiService = retrofit.create(ApiService::class.java)
val call: Call<ZhEntity> = service.getLatestNews()
call.enqueue(object : Callback<ZhEntity> {
override fun onFailure(call: Call<ZhEntity>?, t: Throwable?) {
Log.e("abc", "--> $t")
}
override fun onResponse(call: Call<ZhEntity>?, response: Response<ZhEntity>?) {
Log.e("abc", "-->${Gson().toJson(response?.body())}")
}
})
代碼有點多,分别解釋一下,ZhEntity 是實體類,ApiService 是一個接口,裡面用注解的方式定義了一個方法 getLatestNews,
@GET
表示 Get 請求,由此可以想象肯定有
@POST
,
@GET
裡面還有參數,這是請求位址 BaseUrl 後面的子檔案夾。
getLatestNews 函數傳回類型是 Call,這個是 Retrofit 定義用來請求網絡的。
第三段代碼,現實建立了一個 Retrofit 對象,
addConverterFactory(GsonConverterFactory.create())
是把接口傳回的 json 類型的資料轉換成實體類的類型,這個東西在
implementation 'com.squareup.retrofit2:converter-gson:2.6.2'
時被引入。
然後是一系列的 Call 調用 qnqueue 操作什麼的,看得出,沒有用 Rxjava 一樣可以完成網絡請求,而且代碼不複雜,好了,本文到此結束。
好吧,我在扯淡。繼續講,有人說不喜歡 url 被截成兩段,可以這樣修改,效果完全相同:
// ApiService
import retrofit2.Call
import retrofit2.http.GET
import retrofit2.http.Url
interface ApiService {
@GET
fun getLatestNews(@Url url:String): Call<ZhEntity>
}
// 調用
val retrofit = Retrofit.Builder()
.addConverterFactory(GsonConverterFactory.create())
.baseUrl("https://www.baidu.com")
.build()
val service: ApiService = retrofit.create(ApiService::class.java)
val call: Call<ZhEntity> = service.getLatestNews("https://news-at.zhihu.com/api/4/news/latest")
call.enqueue(object : Callback<ZhEntity> {
override fun onFailure(call: Call<ZhEntity>?, t: Throwable?) {
Log.e("abc", "--> $t")
}
override fun onResponse(call: Call<ZhEntity>?, response: Response<ZhEntity>?) {
Log.e("abc", "-->${Gson().toJson(response?.body())}")
}
})
baseUrl 還是要的,不過設定成其他值無所謂了,因為不會被請求。
4.2 Retrofit 結合 RxJava
啰嗦了這麼多,才講到這裡。抱歉水準有限,沒辦法用簡單的語言說清複雜的問題。
首先,引入依賴時多加一句對 RxJava 的支援:
implementation 'com.squareup.retrofit2:retrofit:2.6.2'
implementation 'com.squareup.retrofit2:converter-gson:2.6.2'
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.6.2'
然後,我們的 getLatestNews 就可以直接傳回一個 Observable 了!
import io.reactivex.Observable
import retrofit2.http.GET
interface ApiService {
@GET("news/latest")
fun getLatestNews(): Observable<ZhEntity>
}
放心寫,不會報錯,有了 Observable,就好辦了,輕車熟路:
val retrofit = Retrofit.Builder()
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.baseUrl("https://news-at.zhihu.com/api/4/")
.build()
val service: ApiService = retrofit.create(ApiService::class.java)
val observable = service.getLatestNews()
observable.subscribeOn(Schedulers.newThread())
.subscribe(object : Observer<ZhEntity> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: ZhEntity) {
Log.e("abc","-->${Gson().toJson(t)}")
}
override fun onError(e: Throwable) {
Log.e("abc","-->$e")
}
})
除了 Observable 來源變了,其他與本文最早講的 RxJava 沒什麼不同。非要說不同,有一點,多了一句
subscribeOn(Schedulers.newThread())
,下面講講這個。
4.3 線程切換
-
:定義 Observable 發射事件所處的線程subscribeOn
-
:定義轉換/響應事件所處的線程(map、flatMap、Observer 等),可多次切換observeOn
線程切換比較常見,比如 子線程請求網絡資料主線程更新 UI,
subscribeOn
和
observeOn
有哪些線程可以選擇?它們又是怎樣使用的?我們先看一個例子:
Thread(object : Runnable {
override fun run() {
Log.e("abc","Thread目前線程:${Thread.currentThread().name}")
observable.subscribeOn(Schedulers.newThread())
.doOnNext(object :Consumer<ZhEntity>{
override fun accept(t: ZhEntity?) {
Log.e("abc","doOnNext目前線程:${Thread.currentThread().name}")
}
})
.observeOn(Schedulers.io())
.flatMap(object :Function<ZhEntity,ObservableSource<ZhEntity.StoriesBean>>{
override fun apply(t: ZhEntity): ObservableSource<ZhEntity.StoriesBean> {
Log.e("abc","flatMap目前線程:${Thread.currentThread().name}")
return Observable.fromIterable(t.stories)
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : Observer<ZhEntity.StoriesBean> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
Log.e("abc","onSubscribe目前線程:${Thread.currentThread().name}")
}
override fun onNext(t: ZhEntity.StoriesBean) {
Log.e("abc","Observer目前線程:${Thread.currentThread().name}")
Log.e("abc", "-->${Gson().toJson(t)}")
}
override fun onError(e: Throwable) {
Log.e("abc", "-->$e")
}
})
}
}).start()
// log 列印
Thread目前線程:Thread-4
onSubscribe目前線程:Thread-4
doOnNext目前線程:RxNewThreadScheduler-1
flatMap目前線程:RxCachedThreadScheduler-1
Observer目前線程:main
Observer目前線程:main
Observer目前線程:main
這裡面隻有
doOnNext
沒講過,現在說說:每發送 onNext() 之前都會先回調這個方法,是以 doOnNext 和 Observable 的 subscribe(發射事件的方法)處于同一個線程。
從這個例子可以看出:
- Observable 和 Observer 建立訂閱關系是在目前線程中(Thread-4)
決定 Observable 發射事件所處的線程(即 Retrofit 請求網絡所線上程)
subscribeOn
- 第一次
決定 flatMap 所在的線程(RxCachedThreadScheduler-1)
observeOn
- 再次
決定 Observer 所線上程(Android 主線程 main)
observeOn
是以每次調用
observeOn
就會切換線程,并且決定的是接下來的變換/響應的線程。多說一句,多次設定 subscribeOn,隻有第一次生效。
線程可選值:
線 程 名 稱 | 說明 |
---|---|
Schedulers.immediate() | 預設的 Scheduler,直接在目前線程運作,相當于不指定線程 |
Schedulers.newThread() | 啟用新線程,并在新線程執行操作 |
Schedulers.io() | I/O 操作(讀寫檔案、讀寫資料庫、網絡資訊互動等)所使用的 Scheduler。行為模式和 newThread() 差不多,差別在于 io() 的内部實作是是用一個無數量上限的線程池,可以重用空閑的線程,是以多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免建立不必要的線程 |
Schedulers.computation() | 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU |
AndroidSchedulers.mainThread() | Android 主線程 |
4.4 Disposable 和 CompositeDisposable
最後介紹一下這兩個類,
Disposable
前文出現過,在 Observer 的 onSubscribe 函數中,有一個 Disposable 類型的參數:
override fun onSubscribe(d: Disposable) {}
,通過前面介紹我們知道,Observable 和 Observer 建立訂閱關系時會調用 onSubscribe 方法,但是沒有說這個參數的作用。
4.4.1 DisPosable
Disposable 的 dispose() 函數可以用來解除訂閱,這樣就不會收到 Observable 發射的事件:
var dis ?= null
val observable = Observable.fromIterable(mutableListOf("Hello", "RxJava", "GoodBye"))
val observer = object : Observer<String> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
dis=d
Log.e("abc", "-----onSubscribe-----$d")
}
override fun onNext(t: String) {
if (t=="Hello") dis.dispose()
Log.e("abc", "-----onNext-----$t")
}
override fun onError(e: Throwable) {
}
}
observable.subscribe(observer)
// log 列印
-----onNext-----Hello
可以看到,調用
dis.dispose()
後,就不在列印上遊發射的"RxJava"和"GoodBye"了。
4.4.2 CompositeDisposable
CompositeDisposable 可以用來管理多個 Disposable,通過
add()
方法添加 Disposable 對象,然後在 onDestroy 方法裡面調用
clear()
或者
dispose()
來清除所有的 Disposable,這樣可以防止記憶體洩漏。
val cDis = CompositeDisposable()
// ...代碼省略
override fun onSubscribe(d: Disposable) {
cDis.add(d)
}
// ...代碼省略
override fun onDestroy() {
super.onDestroy()
cDis.clear()
}
多說一句,通過檢視
RxJava2CallAdapterFactory.create()
源碼可知,
dispose()
方法能主動斷開 Observable 和 Observer 之間的連接配接,還能取消 Retrofit 的網絡請求,是以放心的用吧。