天天看點

RxJava 執行個體1.定義2.作用3. 特點4.使用5.優雅實作其他方法observeOn:觀察者的執行線程subscribeOn:被觀察者的執行線程RXView 防抖

1.定義

RxJava

 是一個 基于事件流、實作異步操作的庫

2.作用

用于實作異步操作,類似于 

Android

中的 

AsyncTask

 、

Handler+

new Thread的作用

3. 特點

由于 

RxJava

的使用方式是:基于事件流的鍊式調用,是以使得 

RxJava

  • 邏輯簡潔
  • 實作優雅
  • 使用簡單

更重要的是,随着程式邏輯的複雜性提高,它依然能夠保持簡潔 & 優雅

  • Rxjava

    原理 基于 一種擴充的觀察者模式
  • Rxjava

    的擴充觀察者模式中有4個角色:
角色 作用 類比
被觀察者(Observable) 産生事件 顧客
觀察者(Observer) 接收事件,并給出響應動作 廚房
訂閱(Subscribe) 連接配接 被觀察者 & 觀察者 服務員
事件(Event) 被觀察者 & 觀察者 溝通的載體 菜式

4.使用

導包

implementation 'io.reactivex.rxjava2:rxjava:2.1.3'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'com.jakewharton.rxbinding2:rxbinding:2.2.0'
           

RxJava 執行個體1.定義2.作用3. 特點4.使用5.優雅實作其他方法observeOn:觀察者的執行線程subscribeOn:被觀察者的執行線程RXView 防抖

 1.定義被觀察者

三種方式:Observable.create、Observable.just、Observable.fromArray

public void buyFood(){
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                    e.onNext("potato");
                    e.onNext("tomato");
                    e.onNext("noodles");
                    e.onComplete();
            }
        });

        Observable<String> observable1 = Observable.just("potato","tomato","noodles");
        String [] foods = new String[]{"potato","tomato","noodles"};
        Observable<String> observable2 = Observable.fromArray(foods);
    }
           

2.定義觀察者

使用observer或者subscriber

Observer<String> mObserver = new Observer<String>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {

        }

        @Override
        public void onNext(@NonNull String s) {

        }

        @Override
        public void onError(@NonNull Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    };
           
Subscriber<String> mStringSubscriber = new Subscriber<String>() {
        @Override
        public void onSubscribe(Subscription s) {
            
        }

        @Override
        public void onNext(String s) {

        }

        @Override
        public void onError(Throwable t) {

        }

        @Override
        public void onComplete() {

        }
    };
           

特别注意:2種方法的差別,即Subscriber 抽象類與Observer 接口的差別 

// 相同點:二者基本使用方式完全一緻(實質上,在RxJava的 subscribe 過程中,Observer總是會先被轉換成Subscriber再使用)

// 不同點:Subscriber抽象類對 Observer 接口進行了擴充,新增了兩個方法:

    // 1\. onStart():在還未響應事件前調用,用于做一些初始化工作

    // 2\. unsubscribe():用于取消訂閱。在該方法被調用後,觀察者将不再接收 & 響應事件

    // 調用該方法前,先使用 isUnsubscribed() 判斷狀态,确定被觀察者Observable是否還持有觀察者Subscriber的引用,如果引用不能及時釋放,就會出現記憶體洩露

3.訂閱

observable.subscribe(observer);

5.優雅實作

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                    e.onNext("potato");
                    e.onNext("tomato");
                    e.onNext("noodles");
                    e.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull String s) {

            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
           

其他方法

observeOn:觀察者的執行線程

observeOn 作用于該操作符之後直到出現新的observeOn操作符

subscribeOn:被觀察者的執行線程

subscribeOn 作用于該操作符之前的 Observable 的建立操符作以及 doOnSubscribe 操作符 ,換句話說就是 doOnSubscribe 以及 Observable 的建立操作符總是被其之後最近的 subscribeOn 控制

執行個體

//将字元串轉換成double類型
    String path = "12.3";
    public void test(){
        //輸入事件
        Observable.just(path)
                //對事件進行處理
                .map(new Function<String, Double>() {
            @Override
            public Double apply(@NonNull String s) throws Exception {
                return Double.parseDouble(s);
            }
        })
                //指定被觀察者執行線程
                .subscribeOn(Schedulers.io())
                //指定觀察者執行線程
                .observeOn(AndroidSchedulers.mainThread())
                //訂閱觀察者
                .subscribe(
                new Observer<Double>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Double aDouble) {

                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }
           

Rxjava中的

Scheduler

相當于線程控制器,Rxjava通過它來指定每一段代碼應該運作在什麼樣的線程。Rxjava提供了5種排程器:

  • .io()
  • .computation()
  • .immediate()
  • .newThread()
  • .trampoline()

另外,Android還有一個專用的

AndroidSchedulers.mainThread()

指定操作将在Android主線程運作。Rxjava通過

subscribeOn()

observeOn()

兩個方法來對線程進行控制,

subscribeOn()

指定

subscribe()

時間發生的線程,也就是事件産生的線程,

observeOn()

指定

Subscriber

做運作的線程,也就是消費事件的線程。

Schedulers.io() 用于I/O操作,比如:讀寫檔案,資料庫,網絡互動等等。行為模式和

newThread()

差不多,重點需要注意的是線程池是無限制的,大量的I/O排程操作将建立許多個線程并占用記憶體

Schedulers.computation()計算工作預設的排程器,這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。

Schedulers.immediate()  這個排程器允許你立即在目前線程執行你指定的工作。這是預設的

Scheduler

Schedulers.newThread()    它為指定任務啟動一個新的線程。

Schedulers.trampoline()

當我們想在目前線程執行一個任務時,并不是立即,我們可以用

trampoline()

将它入隊。這個排程器将會處理它的隊列并且按序運作隊列中每一個任務。

RXView 防抖

private void test3() {
        RxView.clicks(控件).throttleFirst(1000, TimeUnit.MILLISECONDS).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Object o) {
                Log.e(TAG, "onNext: 響應事件....." );
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
           

RxJava篇(應用場景) - 簡書 (jianshu.com)