天天看點

RxJava(RxAndroid)基本使用入門

前言

  RxAndroid是RxJava在Android上的一個擴充,大牛JakeWharton的項目。據說和Retorfit、OkHttp組合起來使用,效果不是一般的好。而且用它似乎可以完全替代eventBus和OTTO,這麼牛的東西當然要研究研究了 ,看看它到底有多厲害。

正文

相關資源

  RxJava的GitHub位址:https://github.com/ReactiveX/RxJava 

  RxAndroid的GitHub位址:https://github.com/ReactiveX/RxAndroid 

  中文文檔:https://mcxiaoke.gitbooks.io/rxdocs/content/ 

  一篇寫的比較好的入門RxJava的文章的位址:http://gank.io/post/560e15be2dca930e00da1083

1.RxJava是幹嘛的

  Rx(Reactive Extensions)是一個庫,用來處理事件和異步任務,在很多語言上都有實作,RxJava是Rx在Java上的實作。簡單來說,RxJava就是處理異步的一個庫,最基本是基于觀察者模式來實作的。通過Obserable和Observer的機制,實作所謂響應式的程式設計體驗。 

  Android的童鞋都知道,處理異步事件,現有的AsyncTask、Handler,不錯的第三方事件總線EventBus、OTTO等等都可以處理。并且大部分童鞋應該都很熟練了。而且經我目前的學習來看,RxJava這個庫,上手确實有門檻,不是拿來就能用。但是作為一個猿,那些可能出現的優秀的架構技術,及時的跟進和學習是必要的,從中汲取營養才能幫助自己成長。況且有童鞋已經表示,它完全可以替代EventBus和OTTO,來看看吧。

2.RxJava的優勢

  最概括的兩個字:簡潔。而且當業務越繁瑣越複雜時這一點就越顯出優勢——它能夠保持簡潔。 

  簡單的demo看不出來,真正投入項目使用了應該就有體會了。它提供的各種功能強悍的操作符真的很強大。

3.基本使用流程

  這裡隻介紹Android Studio的接入方式,如果你還在用Eclipse的話,我建議你換了。 

  配置buile.gradle:(以下為目前最新版本,如有更新請到上述GitHub連結檢視更新)

dependencies {
  compile 'io.reactivex:rxandroid:1.2.1'
  compile 'io.reactivex:rxjava:1.1.6'
  }           
  • 1
  • 2
  • 3
  • 4

  配置完之後就可以使用RxJava的API了。介紹兩個個關鍵的類: 

  (1)Observable (2)Subscriber 即:被觀察者(Observable)和觀察者(Subscriber),其實我覺得叫釋出者和訂閱者更好了解一些,但大家都叫被觀察者和觀察者。 

  主幹的使用過程就是1.建立被觀察者。2.建立觀察者。3.将二者建立聯系。完畢。然後被觀察中發出資訊觸發觀察者的動作,執行相應的方法,就這樣。你先别急着吐槽它很平庸。它的強大在于這個過程中提供的各種操作變換的技巧會讓你可以簡潔的處理相當繁瑣的代碼邏輯。 

先看一個簡單的demo:

//建立一個被觀察者(釋出者)
        Observable observable = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext();
                subscriber.onNext();
                subscriber.onNext();
                subscriber.onCompleted();
            }
        });

        //建立一個觀察者
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted.. ");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "subscriber onError.. " + e.getMessage());
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext.. integer:" + integer);
            }
        };

        //注冊觀察者(這個方法名看起來有點怪,還不如寫成regisiterSubscriber(..)或者幹脆addSubscriber(..))
        //注冊後就會開始調用call()中的觀察者執行的方法 onNext() onCompleted()等
        observable.subscribe(subscriber);           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

  上面的例子中,當Observable發射資料時,會依次調用Subscriber的onNext()方法,将發射的資料作為參數傳給onNext(),如果出錯,則會調用Subscriber的onError()方法,完成所有資料發射後,調用onCompleted()方法,整個過程完畢。 

  但是,subcribe()方法預設在目前線程被調用。是以,這樣使用的話,被觀察者和觀察者的所有的動作都是在同一個線程完成的,沒卵用… 

  但是當然肯定不會就這個程度了,RxJava有兩個方法可以很友善的指定觀察者和被觀察者代碼運作的線程,RxAndroid還有一個擴充,可以指定在UI線程運作。你懂的! 

如下:

//設定觀察者和釋出者代碼所要運作的線程後注冊觀察者
observable.subscribeOn(Schedulers.immediate())//在目前線程執行subscribe()方法
.observeOn(AndroidSchedulers.mainThread())//在UI線程執行觀察者的方法
.subscribe(subscriber);           
  • 1
  • 2
  • 3
  • 4

  通過Scheduler作為參數來指定代碼運作的線程,非常友善,好用到不行…其他常用的參數還有: 

  Schedulers.immediate(): 直接在目前線程運作,相當于不指定線程。這是預設的 Scheduler。  

  Schedulers.newThread(): 總是啟用新線程,并在新線程執行操作。 

  Schedulers.io(): I/O 操作(讀寫檔案、讀寫資料庫、網絡資訊互動等)所使用的 Scheduler。行為模式和 newThread() 差不多,差別在于 io() 的内部實作是是用一個無數量上限的線程池,可以重用空閑的線程,是以多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免建不必要的線程。 

  Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。 

  另外, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主線程運作。

4.Observable建立方式

  以上介紹了主幹使用流程,從這裡我們往細一點再看。前文說了,RxJava的強大之處在于它的各種操作符。在建立Observable對象的方式上,同樣有很多友善的操作符的實作,上面是通過Observable.create()方法建立的observable對象,這裡介紹其他幾個常用的方法。 

   

  通過from建立Observable:

     //Teacher為一個資料Bean,包含姓名,年齡,住址三個字段
     List<Teacher> teachers = new ArrayList<>();
        for (int i = ; i < ; i++) {
            teachers.add(new Teacher("name" + i, i, "place" + i));
        }
        //from方法支援繼承了Interable接口的參數,是以常用的資料結構(Map、List..)都可以轉換
        Observable fromObservale = Observable.from(teachers);
        fromObservale.subscribe(new Subscriber<Teacher>() {
            @Override
            public void onCompleted() {
                Log.i(TAG, "from(teachers)  onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "from(teachers)  " + e.getMessage());
            }

            @Override
            public void onNext(Teacher teacher) {
                //依次接收到teachers中的對象
                Log.d(TAG, "from(teachers)  onNext:" + teacher.toString());
            }
        });           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

  用from方法建立Observable,可以傳入一個數組,或者一個繼承了Iterable的類的對象作為參數,也就是說,java中常用的資料結構如List、Map等都可以直接作為參數傳入from()方法用以建構Observable。這樣,當Observable發射資料時,它将會依次把序列中的元素依次發射出來。 

   

  通過just建立Observable:  

//Just類似于From,但是From會将數組或Iterable的元素具取出然後逐個發射,而Just隻是簡單的原樣發射,将數組或Iterable當做單個資料。
//Just接受一至九個參數,傳回一個按參數清單順序發射這些資料的Observable
Observable justObservable = Observable.just(, "someThing", false, f, new Teacher("Jhon", , "NewYork"));
justObservable.subscribe(new Subscriber() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "just(...)  onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "just(...)  onError:" + e.getMessage());
    }

    @Override
    public void onNext(Object o) {
        Log.d(TAG, "just(...)  onNext:" + o.toString());
    }
});           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

  just直接接收object作為參數,原樣發射出來,也是非常友善的。 

   

通過timer建立Observable:  

//timer()建立一個Observable,它在一個給定的延遲後發射一個特殊的值 設定執行方法在UI線程執行
//延時兩秒後發射值
//實測 延時2s後發送了一個0
Observable timerObservable = Observable.timer(, TimeUnit.SECONDS, AndroidSchedulers.mainThread());
   timerObservable.subscribe(
           new Subscriber() {
               @Override
               public void onCompleted() {
                   Log.i(TAG, "timer(...)  onCompleted");
                   refreshStr("timer(...)  onCompleted\n");
               }

               @Override
               public void onError(Throwable e) {
                   Log.e(TAG, "timer(...)  onError:" + e.getMessage());
                   refreshStr("timer(...)  onError:" + e.getMessage());
               }

               @Override
               public void onNext(Object o) {
                   Log.d(TAG, "timer(...)  onNext:" + o.toString());
                   refreshStr("timerObservable 延時兩秒觸發 發送值:" + o.toString());
               }
           }
   );           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

  timer有定時的作用,延時發送一個值0。 

   

通過range建立Observable(這裡疊加使用一個repeat方法):  

//range 發射從n到m的整數序列 可以指定Scheduler設定執行方法運作的線程
//repeat方法可以指定重複觸發的次數
Observable rangeObservable = Observable.range(, ).repeat();
rangeObservable.subscribe(
//在不寫觀察者的情況下,可以使用Action1和Action0這兩個接口來實作不完整定義的回調; 參見:ActionSubscriber
//Action1<T>可以代替實作onNext(); Action1<Throwable>可以代替實作onError(); Action0可以代替實作onConplete()
        new Action1() {
            @Override
            public void call(Object o) {
                Log.e(TAG, "range(3, 7).repeat(2)  onNext:"+o.toString());
            }
        },
        new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                Log.e(TAG, "range(3, 7).repeat(2)  "+throwable.getMessage());
            }
        },
        new Action0() {
            @Override
            public void call() {
                Log.i(TAG, "range(3, 7).repeat(2)  onCompleted");
            }
        });           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

  range發射從n到m的整數序列,repeat可以指定重複次數,以上發射的次序為:3,4,5,6,7,3,4,5,6,7。這裡用到的Action0和Action1是兩個可以替代Subscriber的接口,具體可以參見相關文檔和源碼實作,這裡不深入介紹。 

  其他還有Interval、Defer、Start等方法就不一一介紹了,本文主要是幫助初次接觸的童鞋入門,RxJava的操作符非常豐富,這裡很難一一說明,更多的内容要還需要大家自己去熟悉和探究。

5.變換操作

  除了多樣的Observable建立方式,RxJava還有一個神奇的操作就是變換。通過自己定義的方法,你可以将輸入的值變換成另一種類型再輸出(比如輸入url,輸出bitmap),單一變換、批量變換、甚至實作雙重變換,嵌套兩重異步操作!并且代碼格式一如既往的幹淨平整。是不是很牛? 

   

  使用map()方法做轉換:

Runnable run = new Runnable() {
     @Override
     public void run() {
         //将檔案路徑轉換為bitmap發出 觀察者直接收到bitmap進行處理
         Observable observable = Observable.just(imgFilePath);
         observable.map(new Func1<String, Bitmap>() {
             @Override
             public Bitmap call(String imgFilePath) {
                 return getBitmapFromAssets(imgFilePath);
             }
         }).subscribeOn(Schedulers.immediate())//目前線程(子線程)釋出
                 .observeOn(AndroidSchedulers.mainThread())//UI線程執行(更新圖檔)
                 .subscribe(new Subscriber<Bitmap>() {
                     @Override
                     public void onCompleted() {
                         Log.i(TAG, "observable.map(..)  onCompleted");
                     }

                     @Override
                     public void onError(Throwable e) {
                         Log.i(TAG, "observable.map(..)  onError" + e.getMessage());
                     }

                     @Override
                     public void onNext(Bitmap bitmap) {
                         //顯示圖檔
                         iv.setImageBitmap(bitmap);
                     }
                 });
     }
 };
 new Thread(run).start();           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

  map()方法是最基本的變換操作,這裡隻變換了一個資料,将檔案路徑解析為Bitmap顯示出來。你當然也可以多傳入幾個參數或者用from操作符傳入一個數組或者集合等,批量操作,并且同時指定代碼運作的線程。而且這些所有的操作都可以在一條鍊式代碼中全部完成,易讀易維護。你是不是已經有一點體會到它的威力了? 

   

  flatMap()實作雙重變換 

  flatMap()将一個發射資料的Observable變換為多個Observables,然後将它們發射的資料合并後放進一個單獨的Observable。即:第一次轉換時,它依次将輸入的資料轉換成一個Observable,然後将這些Observable發射的資料集中到一個Observable裡依次發射出來。覺得莫名其妙?來看一個實際例子:

Subscriber subscriber = new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
        Log.i(TAG,"Observable.just(array1,array2).flatMap  onCompleted\n\n");
    }

    @Override
    public void onError(Throwable e) {
        Log.e(TAG,"Observable.just(array1,array2).flatMap   onError  "+e.getMessage());
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG,"Observable.just(array1,array2).flatMap  integer = "+integer);
    }
};

//flatMap可以實作一個雙重轉換,在它的回調方法中會傳回一個observable對象,但它并不會直接發射這個對象
//而是将這個observable對象要發射的值 集中到一個新的observable對象中依次發射
//如本例,第一層Observable依次發射兩個數組,經過flatmap轉換之後,變成變成兩個依次發射數組元素的observable
// 最後在subscriber中接收到的直接是整型數,等于将兩個數組"鋪開"了,直接發射整數,這就是大概地"flat"的含義吧
// flatMap方法可以很靈活的使用,實作雙重變換,滿足很多不同情況下的需求,比如處理嵌套的異步代碼等,非常棒!
Integer[] array1 = {, , , }, array2 = {, , , };
Observable.just(array1,array2).flatMap(new Func1<Integer[], Observable<?>>() {
    @Override
    public Observable<?> call(Integer[] ints) {
        Observable observable = Observable.from(ints);
        return observable;
    }
}).subscribe(subscriber);           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

  這裡flatMap()方法将最初傳入的兩個數組在第一次變換時,通過from操作符變換成兩個Observable,然後在将這兩個Observable發射的資料全部集中到一個新的Observable中集中發射,等于将兩個數組”鋪開”了,依次發射出來它們的元素。具體轉換的方法由你指定,使用的方式是比較靈活的。 

  比如有的商城類應用的需求:先要拿到某類别的一個産品清單,然後清單中有具體産品展示圖檔的url,需要你拿到産品清單資訊後依次去請求圖檔,成功後更新到UI頁面上,使用flatMap,你肯定知道怎麼寫了吧,是不是比CallBack跳來跳去的舒服一些?

  scan()變換 

  scan操作符對原始Observable發射的第一項資料應用一個函數,然後将那個函數的結果作為自己的第一項資料發射。它将函數的結果同第二項資料一起填充給這個函數來産生它自己的第二項資料。它持續進行這個過程來産生剩餘的資料序列。 

  當看到這裡的時候,我已經由衷的在感歎,這些操作符實在太TM豐富了,然後對它的強大已經開始有所體會和感悟了。這種産生斐波那契數列的操作都給封裝進去了,而且函數由你自定義,你能用它做成什麼,可能在靈感到來之前你自己都想不到。 

  demo代碼:

//scan 會将輸入的第一個元素當作參數做一個函數運算(函數由你實作,規定需要兩個參數,此時另一個預設沒有),然後發射結果
// 同時,運算結果會被當作函數的與第二個參數與第二個元素再進行函數運算,完成後發射結果
// 然後将這個結果與第三個元素作為函數的參數再次運算...直到最後一個元素
Observable.just(, , , ).scan(new Func2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer integer, Integer integer2) {
        //integer是第一個元素或上一次計算的結果,integer2是下一輪運算中新的序列中元素
        Log.d(TAG, "scan call   integer:" + integer + "   integer2:" + integer2);
        return integer + integer2;
    }
}).subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "Observable.just(1,2,3,4).scan   onCompleted..");
        initViewState();
    }

    @Override
    public void onError(Throwable e) {
        Log.e(TAG, "Observable.just(1,2,3,4).scan  onError  " + e.getMessage());
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "Observable.just(1,2,3,4).scan  onNext()..  integer = " + integer);
        /**
         * 第一次為1,然後是3(1+2),6(3+3),10(6+4)
         */
    }
});           
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

  注釋和上面的說明都很清晰了,就不再贅述。同樣,關于轉換操作,也還有很多其他的操作符,如wiindow() buffer() 等已實作的方法,具體參見文檔吧。

6.過濾、結合操作

  在文檔的分類中,還有兩片基礎API是過濾和結合的操作符,例如:Filter、Skip、Take、Merage、Zip等等,本來打算一起列舉的,但是想想其實如果熟悉了上面的内容,這兩塊相關的API上手其實也很容易了。如果入門目的已經達到,再講這個顯得有點啰嗦。是以略去,如果以後有心得,在開篇另講那些操作符的使用,RxJava的知識,主要還是要靠自己多熟悉,多研究。

尾聲

  本文主要希望幫助初次接觸RxJava的童鞋入門,講的一些基礎知識,RxJava太大,内容太豐富,入門之後需要下的功夫也不少,希望大家都能day day up! 

  最後,如發現内容有誤,請斧正! 

  非常感謝!