天天看點

這可能是最好的RxJava 2.x 教程(完結版)

這可能是最好的 RxJava 2.x 入門教程系列專欄

文章連結:

這可能是最好的RxJava 2.x 入門教程(一) 這可能是最好的RxJava 2.x 入門教程(二) 這可能是最好的RxJava 2.x 入門教程(三) 這可能是最好的RxJava 2.x 入門教程(四) 這可能是最好的RxJava 2.x 入門教程(五) GitHub 代碼同步更新: https://github.com/nanchen2251/RxJava2Examples 為了滿足大家的饑渴難耐,GitHub将同步更新代碼,主要包含基本的代碼封裝,RxJava 2.x所有操作符應用場景介紹和實際應用場景,後期除了RxJava可能還會增添其他東西,總之,GitHub上的Demo專為大家傾心打造。傳送門:

為什麼要學 RxJava?

提升開發效率,降低維護成本一直是開發團隊永恒不變的宗旨。近兩年來國内的技術圈子中越來越多的開始提及

RxJava

,越來越多的應用和面試中都會有

,而就目前的情況,Android 的網絡庫基本被

Retrofit

+

OkHttp

一統天下了,而配合上響應式程式設計

可謂如魚得水。想必大家肯定被近期的

Kotlin

炸開了鍋,筆者也在閑暇之時去了解了一番(作為一個與時俱進的有理想的青年怎麼可能不與時俱進?),發現其中有個非常好的優點就是簡潔,支援函數式程式設計。是的, RxJava 最大的優點也是簡潔,但它不止是簡潔,而且是** 随着程式邏輯變得越來越複雜,它依然能夠保持簡潔 **(這貨潔身自好呀有木有)。

咳咳,要例子,猛戳這裡:

給 Android 開發者的 RxJava 詳解

什麼是響應式程式設計

上面我們提及了響應式程式設計,不少新司機對它可謂一臉懵逼,那什麼是響應式程式設計呢?響應式程式設計是一種基于異步資料流概念的程式設計模式。資料流就像一條河:它可以被觀測,被過濾,被操作,或者為新的消費者與另外一條流合并為一條新的流。

響應式程式設計的一個關鍵概念是事件。事件可以被等待,可以觸發過程,也可以觸發其它事件。事件是唯一的以合适的方式将我們的現實世界映射到我們的軟體中:如果屋裡太熱了我們就打開一扇窗戶。同樣的,當我們的天氣app從服務端擷取到新的天氣資料後,我們需要更新app上展示天氣資訊的UI;汽車上的車道偏移系統探測到車輛偏移了正常路線就會提醒駕駛者糾正,就是是響應事件。

今天,響應式程式設計最通用的一個場景是UI:我們的移動App必須做出對網絡調用、使用者觸摸輸入和系統彈框的響應。在這個世界上,軟體之是以是事件驅動并響應的是因為現實生活也是如此。

為什麼出了一個系列後還有完結版?

這些年可謂越來越流行,而在去年的晚些時候釋出了2.0正式版。大半年已過,雖然網上已經出現了大部分的 RxJava 教程(其實細心的你還是會發現 1.x 的超級多),前些日子,筆者花了大約兩周的閑暇之時寫了 RxJava 2.x 系列教程,也得到了不少回報,其中就有不少讀者覺得每一篇的教程太短,抑或是希望更多的側重适用場景的介紹,在這樣的大前提下,這篇完結版教程就此誕生,僅供各位新司機采納。

開始

RxJava 2.x 已經按照 Reactive-Streams specification 規範完全的重寫了,maven也被放在了

io.reactivex.rxjava2:rxjava:2.x.y

下,是以 RxJava 2.x 獨立于 RxJava 1.x 而存在,而随後官方宣布的将在一段時間後終止對 RxJava 1.x 的維護,是以對于熟悉 RxJava 1.x 的老司機自然可以直接看一下

2.x 的文檔

和異同就能輕松上手了,而對于不熟悉的年輕司機,不要慌,本醬帶你裝逼帶你飛,馬上就發車,坐穩了:

你隻需要在 build.gradle 中加上:

compile 'io.reactivex.rxjava2:rxjava:2.1.1'

(2.1.1為寫此文章時的最新版本)

接口變化

RxJava 2.x 擁有了新的特性,其依賴于4個基礎接口,它們分别是

  • Publisher
  • Subscriber
  • Subscription
  • Processor

其中最核心的莫過于

Publisher

Subscriber

Publisher

可以發出一系列的事件,而

Subscriber

負責和處理這些事件。

其中用的比較多的自然是

Publisher

Flowable

,它支援背壓。關于背壓給個簡潔的定義就是:

背壓是指在異步場景中,被觀察者發送事件速度遠快于觀察者的處理速度的情況下,一種告訴上遊的被觀察者降低發送速度的政策。

簡而言之,背壓是流速控制的一種政策。有興趣的可以看一下

官方對于背壓的講解

可以明顯地發現,RxJava 2.x 最大的改動就是對于

backpressure

的處理,為此将原來的

Observable

拆分成了新的

Observable

Flowable

,同時其他相關部分也同時進行了拆分,但令人慶幸的是,是它,是它,還是它,還是我們最熟悉和最喜歡的 RxJava。

觀察者模式

大家可能都知道, RxJava 以觀察者模式為骨架,在 2.0 中依舊如此。

不過此次更新中,出現了兩種觀察者模式:

  • Observable ( 被觀察者 ) / Observer ( 觀察者 )
  • Flowable (被觀察者)/ Subscriber (觀察者)

在 RxJava 2.x 中,Observable 用于訂閱 Observer,不再支援背壓(1.x 中可以使用背壓政策),而 Flowable 用于訂閱 Subscriber , 是支援背壓(Backpressure)的。

Observable

在 RxJava 1.x 中,我們最熟悉的莫過于

Observable

這個類了,筆者在剛剛使用 RxJava 2.x 的時候,建立了 一個

Observable

,瞬間一臉懵逼有木有,居然連我們最最熟悉的

Subscriber

都沒了,取而代之的是

ObservableEmmiter

,俗稱發射器。此外,由于沒有了

Subscriber

的蹤影,我們建立觀察者時需使用

Observer

。而

Observer

也不是我們熟悉的那個

Observer

,又出現了一個

Disposable

參數帶你裝逼帶你飛。

廢話不多說,從會用開始,還記得 RxJava 的三部曲嗎?

** 第一步:初始化 Observable **

** 第二步:初始化 Observer **

** 第三步:建立訂閱關系 **

Observable.create(new ObservableOnSubscribe<Integer>() { // 第一步:初始化Observable
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.e(TAG, "Observable emit 1" + "\n");
                e.onNext(1);
                Log.e(TAG, "Observable emit 2" + "\n");
                e.onNext(2);
                Log.e(TAG, "Observable emit 3" + "\n");
                e.onNext(3);
                e.onComplete();
                Log.e(TAG, "Observable emit 4" + "\n" );
                e.onNext(4);
            }
        }).subscribe(new Observer<Integer>() { // 第三步:訂閱

            // 第二步:初始化Observer
            private int i;
            private Disposable mDisposable;

            @Override
            public void onSubscribe(@NonNull Disposable d) {      
                mDisposable = d;
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                i++;
                if (i == 2) {
                    // 在RxJava 2.x 中,新增的Disposable可以做到切斷的操作,讓Observer觀察者不再接收上遊事件
                    mDisposable.dispose();
                }
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e(TAG, "onError : value : " + e.getMessage() + "\n" );
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete" + "\n" );
            }
        });
           

不難看出,RxJava 2.x 與 1.x 還是存在着一些差別的。首先,建立

Observable

時,回調的是

ObservableEmitter

,字面意思即發射器,并且直接 throws Exception。其次,在建立的 Observer 中,也多了一個回調方法:

onSubscribe

,傳遞參數為

Disposable

Disposable

相當于 RxJava 1.x 中的

Subscription

, 用于解除訂閱。可以看到示例代碼中,在 i 自增到 2 的時候,訂閱關系被切斷。

07-03 14:24:11.663 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onSubscribe : false
07-03 14:24:11.664 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 1
07-03 14:24:11.665 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 1
07-03 14:24:11.666 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 2
07-03 14:24:11.667 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : value : 2
07-03 14:24:11.668 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: onNext : isDisposable : true
07-03 14:24:11.669 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 3
07-03 14:24:11.670 18467-18467/com.nanchen.rxjava2examples E/RxCreateActivity: Observable emit 4
           

當然,我們的 RxJava 2.x 也為我們保留了簡化訂閱方法,我們可以根據需求,進行相應的簡化訂閱,隻不過傳入對象改為了

Consumer

Consumer

即消費者,用于接收單個值,

BiConsumer

則是接收兩個值,

Function

用于變換對象,

Predicate

用于判斷。這些接口命名大多參照了 Java 8 ,熟悉 Java 8 新特性的應該都知道意思,這裡也不再贅述。

線程排程

關于線程切換這點,RxJava 1.x 和 RxJava 2.x 的實作思路是一樣的。這裡簡單的說一下,以便于我們的新司機入手。

subScribeOn

同 RxJava 1.x 一樣,

subscribeOn

用于指定

subscribe()

時所發生的線程,從源碼角度可以看出,内部線程排程是通過

ObservableSubscribeOn

來實作的。

@SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
           

ObservableSubscribeOn

的核心源碼在

subscribeActual

方法中,通過代理的方式使用

SubscribeOnObserver

包裝

Observer

後,設定

Disposable

來将

subscribe

切換到

Scheduler

線程中。

observeOn

observeOn

方法用于指定下遊

Observer

回調發生的線程。

@SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
           

線程切換需要注意的

RxJava 内置的線程排程器的确可以讓我們的線程切換得心應手,但其中也有些需要注意的地方。

  • 簡單地說,

    subscribeOn()

    指定的就是發射事件的線程,

    observerOn

    指定的就是訂閱者接收事件的線程。
  • 多次指定發射事件的線程隻有第一次指定的有效,也就是說多次調用

    subscribeOn()

    隻有第一次的有效,其餘的會被忽略。
  • 但多次指定訂閱者接收線程是可以的,也就是說每調用一次

    observerOn()

    ,下遊的線程就會切換一次。
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
                e.onNext(1);
                e.onComplete();
            }
        }).subscribeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.e(TAG, "After observeOn(mainThread),Current thread is " + Thread.currentThread().getName());
                    }
                })
                .observeOn(Schedulers.io())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName());
                    }
                });
           

輸出:

07-03 14:54:01.177 15121-15438/com.nanchen.rxjava2examples E/RxThreadActivity: Observable thread is : RxNewThreadScheduler-1
07-03 14:54:01.178 15121-15121/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(mainThread),Current thread is main
07-03 14:54:01.179 15121-15439/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(io),Current thread is RxCachedThreadScheduler-2
           

執行個體代碼中,分别用

Schedulers.newThread()

Schedulers.io()

對發射線程進行切換,并采用

observeOn(AndroidSchedulers.mainThread()

Schedulers.io()

進行了接收線程的切換。可以看到輸出中發射線程僅僅響應了第一個

newThread

,但每調用一次

observeOn()

,線程便會切換一次,是以如果我們有類似的需求時,便知道如何處理了。

RxJava 中,已經内置了很多線程選項供我們選擇,例如有:

  • Schedulers.io()

    代表io操作的線程, 通常用于網絡,讀寫檔案等io密集型的操作;
  • Schedulers.computation()

    代表CPU計算密集型的操作, 例如需要大量計算的操作;
  • Schedulers.newThread()

    代表一個正常的新線程;
  • AndroidSchedulers.mainThread()

    代表Android的主線程

這些内置的

Scheduler

已經足夠滿足我們開發的需求,是以我們應該使用内置的這些選項,而 RxJava 内部使用的是線程池來維護這些線程,是以效率也比較高。

操作符

關于操作符,在

官方文檔

中已經做了非常完善的講解,并且筆者

前面的系列教程

中也着重講解了絕大多數的操作符作用,這裡受于篇幅限制,就不多做贅述,隻挑選幾個進行實際情景的講解。

map

map

操作符可以将一個

Observable

對象通過某種關系轉換為另一個

Observable

對象。在 2.x 中和 1.x 中作用幾乎一緻,不同點在于:2.x 将 1.x 中的

Func1

Func2

改為了

Function

BiFunction

采用 map 操作符進行網絡資料解析

想必大家都知道,很多時候我們在使用 RxJava 的時候總是和 Retrofit 進行結合使用,而為了友善示範,這裡我們就暫且采用 OkHttp3 進行示範,配合

map

doOnNext

,線程切換進行簡單的網絡請求:

1)通過 Observable.create() 方法,調用 OkHttp 網絡請求;

2)通過 map 操作符集合 gson,将 Response 轉換為 bean 類;

3)通過 doOnNext() 方法,解析 bean 中的資料,并進行資料庫存儲等操作;

4)排程線程,在子線程中進行耗時操作任務,在主線程中更新 UI ;

5)通過 subscribe(),根據請求成功或者失敗來更新 UI 。

Observable.create(new ObservableOnSubscribe<Response>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
                Builder builder = new Builder()
                        .url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                        .get();
                Request request = builder.build();
                Call call = new OkHttpClient().newCall(request);
                Response response = call.execute();
                e.onNext(response);
            }
        }).map(new Function<Response, MobileAddress>() {
                    @Override
                    public MobileAddress apply(@NonNull Response response) throws Exception {
                        if (response.isSuccessful()) {
                            ResponseBody body = response.body();
                            if (body != null) {
                                Log.e(TAG, "map:轉換前:" + response.body());
                                return new Gson().fromJson(body.string(), MobileAddress.class);
                            }
                        }
                        return null;
                    }
                }).observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(@NonNull MobileAddress s) throws Exception {
                        Log.e(TAG, "doOnNext: 儲存成功:" + s.toString() + "\n");
                    }
                }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<MobileAddress>() {
                    @Override
                    public void accept(@NonNull MobileAddress data) throws Exception {
                        Log.e(TAG, "成功:" + data.toString() + "\n");
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "失敗:" + throwable.getMessage() + "\n");
                    }
                });
           

concat

concat

可以做到不交錯的發射兩個甚至多個 Observable 的發射事件,并且隻有前一個

Observable

終止(

onComplete

) 後才會訂閱下一個

Observable

采用 concat 操作符先讀取緩存再通過網絡請求擷取資料

想必在實際應用中,很多時候(對資料操作不敏感時)都需要我們先讀取緩存的資料,如果緩存沒有資料,再通過網絡請求擷取,随後在主線程更新我們的UI。

concat

操作符簡直就是為我們這種需求量身定做。

利用

concat

的必須調用

onComplete

後才能訂閱下一個

Observable

的特性,我們就可以先讀取緩存資料,倘若擷取到的緩存資料不是我們想要的,再調用

onComplete()

以執行擷取網絡資料的

Observable

,如果緩存資料能應我們所需,則直接調用

onNext()

,防止過度的網絡請求,浪費使用者的流量。

Observable<FoodList> cache = Observable.create(new ObservableOnSubscribe<FoodList>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<FoodList> e) throws Exception {
                Log.e(TAG, "create目前線程:"+Thread.currentThread().getName() );
                FoodList data = CacheManager.getInstance().getFoodListData();

                // 在操作符 concat 中,隻有調用 onComplete 之後才會執行下一個 Observable
                if (data != null){ // 如果緩存資料不為空,則直接讀取緩存資料,而不讀取網絡資料
                    isFromNet = false;
                    Log.e(TAG, "\nsubscribe: 讀取緩存資料:" );
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            mRxOperatorsText.append("\nsubscribe: 讀取緩存資料:\n");
                        }
                    });

                    e.onNext(data);
                }else {
                    isFromNet = true;
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            mRxOperatorsText.append("\nsubscribe: 讀取網絡資料:\n");
                        }
                    });
                    Log.e(TAG, "\nsubscribe: 讀取網絡資料:" );
                    e.onComplete();
                }


            }
        });

        Observable<FoodList> network = Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
                .addQueryParameter("rows",10+"")
                .build()
                .getObjectObservable(FoodList.class);


        // 兩個 Observable 的泛型應當保持一緻

        Observable.concat(cache,network)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<FoodList>() {
                    @Override
                    public void accept(@NonNull FoodList tngouBeen) throws Exception {
                        Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName() );
                        if (isFromNet){
                            mRxOperatorsText.append("accept : 網絡擷取資料設定緩存: \n");
                            Log.e(TAG, "accept : 網絡擷取資料設定緩存: \n"+tngouBeen.toString() );
                            CacheManager.getInstance().setFoodListData(tngouBeen);
                        }

                        mRxOperatorsText.append("accept: 讀取資料成功:" + tngouBeen.toString()+"\n");
                        Log.e(TAG, "accept: 讀取資料成功:" + tngouBeen.toString());
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "subscribe 失敗:"+Thread.currentThread().getName() );
                        Log.e(TAG, "accept: 讀取資料失敗:"+throwable.getMessage() );
                        mRxOperatorsText.append("accept: 讀取資料失敗:"+throwable.getMessage()+"\n");
                    }
                });
           

有時候我們的緩存可能還會分為 memory 和 disk ,實際上都差不多,無非是多寫點

Observable

,然後通過

concat

合并即可。

flatMap 實作多個網絡請求依次依賴

想必這種情況也在實際情況中比比皆是,例如使用者注冊成功後需要自動登入,我們隻需要先通過注冊接口注冊使用者資訊,注冊成功後馬上調用登入接口進行自動登入即可。

我們的

flatMap

恰好解決了這種應用場景,

flatMap

操作符可以将一個發射資料的

Observable

變換為多個

Observables

,然後将它們發射的資料合并後放到一個單獨的

Observable

,利用這個特性,我們很輕松地達到了我們的需求。

Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
                .addQueryParameter("rows", 1 + "")
                .build()
                .getObjectObservable(FoodList.class) // 發起擷取食品清單的請求,并解析到FootList
                .subscribeOn(Schedulers.io())        // 在io線程進行網絡請求
                .observeOn(AndroidSchedulers.mainThread()) // 在主線程處理擷取食品清單的請求結果
                .doOnNext(new Consumer<FoodList>() {
                    @Override
                    public void accept(@NonNull FoodList foodList) throws Exception {
                        // 先根據擷取食品清單的響應結果做一些操作
                        Log.e(TAG, "accept: doOnNext :" + foodList.toString());
                        mRxOperatorsText.append("accept: doOnNext :" + foodList.toString()+"\n");
                    }
                })
                .observeOn(Schedulers.io()) // 回到 io 線程去處理擷取食品詳情的請求
                .flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() {
                    @Override
                    public ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception {
                        if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) {
                            return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show")
                                    .addBodyParameter("id", foodList.getTngou().get(0).getId() + "")
                                    .build()
                                    .getObjectObservable(FoodDetail.class);
                        }
                        return null;

                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<FoodDetail>() {
                    @Override
                    public void accept(@NonNull FoodDetail foodDetail) throws Exception {
                        Log.e(TAG, "accept: success :" + foodDetail.toString());
                        mRxOperatorsText.append("accept: success :" + foodDetail.toString()+"\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "accept: error :" + throwable.getMessage());
                        mRxOperatorsText.append("accept: error :" + throwable.getMessage()+"\n");
                    }
                });
           

善用 zip 操作符,實作多個接口資料共同更新 UI

在實際應用中,我們極有可能會在一個頁面顯示的資料來源于多個接口,這時候我們的

zip

操作符為我們排憂解難。

zip

操作符可以将多個

Observable

的資料結合為一個資料源再發射出去。

Observable<MobileAddress> observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                .build()
                .getObjectObservable(MobileAddress.class);

        Observable<CategoryResult> observable2 = Network.getGankApi()
                .getCategoryData("Android",1,1);

        Observable.zip(observable1, observable2, new BiFunction<MobileAddress, CategoryResult, String>() {
            @Override
            public String apply(@NonNull MobileAddress mobileAddress, @NonNull CategoryResult categoryResult) throws Exception {
                return "合并後的資料為:手機歸屬地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.results.get(0).who;
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        Log.e(TAG, "accept: 成功:" + s+"\n");
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {
                        Log.e(TAG, "accept: 失敗:" + throwable+"\n");
                    }
                });
           

采用 interval 操作符實作心跳間隔任務

想必即時通訊等需要輪訓的任務在如今的 APP 中已是很常見,而 RxJava 2.x 的

interval

操作符可謂完美地解決了我們的疑惑。

這裡就簡單的意思一下輪訓。

private Disposable mDisposable;
    @Override
    protected void doSomething() {
        mDisposable = Flowable.interval(1, TimeUnit.SECONDS)
                .doOnNext(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        Log.e(TAG, "accept: doOnNext : "+aLong );
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(@NonNull Long aLong) throws Exception {
                        Log.e(TAG, "accept: 設定文本 :"+aLong );
                        mRxOperatorsText.append("accept: 設定文本 :"+aLong +"\n");
                    }
                });
    }

    /**
     * 銷毀時停止心跳
     */
    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null){
            mDisposable.dispose();
        }
    }
           

RxJava 1.x 如何平滑更新到 RxJava 2.x?

由于 RxJava 2.x 變化較大無法直接更新,幸運的是,官方為我們提供了

RxJava2Interrop

這個庫,可以友善地把 RxJava 1.x 更新到 RxJava 2.x,或者将 RxJava 2.x 轉回到 RxJava 1.x。

寫在最後

本醬看你都看到這兒了,實為未來的棟梁之才,是以且送你一本經書:

GIF.gif

做不完的開源,寫不完的矯情。歡迎掃描下方二維碼或者公衆号搜尋「nanchen」關注我的微信公衆号,目前多營運 Android ,盡自己所能為你提升。現在還有純面試系列喲~涵蓋 Java、資料結構與算法、計算機網絡、Android 基礎。如果你喜歡,為我點贊分享吧~

nanchen

繼續閱讀