天天看點

RxJava RxAndroid理論知識詳解

Rx并不是一種新的語言,而是一種普通的Java模式,類似于觀察者模式(Observer Pattern),可以将它看作一個普通的Java類庫,是以你可以立即使用RxJava。而RxAndroid是RxJava的一個針對Android平台的擴充,主要用于 Android 開發

  1. API 介紹和原理簡析

    RxJava 的異步實作,是通過一種擴充的觀察者模式來實作的。

    RxJava 有四個基本概念:Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。

    Observable 和 Observer 通過 subscribe() 方法實作訂閱關系,進而 Observable 可以在需要的時候發出事件來通知 Observer。

    與傳統觀察者模式不同, RxJava 的事件回調方法除了普通事件 onNext() (相當于 onClick() / onEvent())之外,

    還定義了兩個特殊的事件:onCompleted() 和 onError()。

    onCompleted(): 事件隊列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。RxJava 規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為标志。

    onError(): 事件隊列異常。在事件處理過程中出異常時,onError() 會被觸發,同時隊列自動終止,不允許再有事件發出。

    在一個正确運作的事件序列中, onCompleted() 和 onError() 有且隻有一個,并且是事件序列中的最後一個。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在隊列中調用了其中一個,就不應該再調用另一個。

  2. 基本實作
) 建立觀察者Observer
        Observer 即觀察者,它決定事件觸發的時候将有怎樣的行為。 RxJava 中的 Observer 接口的實作方式:
Observer<String > observer = new Observer<String>() {
            @Override
            public void onNext(String s) {
                Log.e(tag, "Item: " + s);
            }

            @Override
            public void onCompleted() {
                Log.e(tag, "Completed!");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(tag, "Error!");
            }
        };
除了 Observer 接口之外,RxJava 還内置了一個實作了 Observer 的抽象類:Subscriber。 Subscriber 對 Observer 接口進行了一些擴充,但他們的基本使用方式是完全一樣的:
Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onNext(String s) {
                Log.e(tag, "Item: " + s);
            }

            @Override
            public void onCompleted() {
                Log.e(tag, "Completed!");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(tag, "Error!");
            }
        };
        Observer 和 Subscriber它們的差別對于使用者來說主要有兩點:
    onStart(): 這是 Subscriber 增加的方法。它會在 subscribe 剛開始,而事件還未發送之前被調用,可以用于做一些準備工作,例如資料的清零或重置。這是一個可選方法,預設情況下它的實作為空。需要注意的是,如果對準備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執行), onStart() 就不适用了,因為它總是在 subscribe 所發生的線程被調用,而不能指定線程。要在指定的線程來做準備工作,可以使用 doOnSubscribe() 方法,具體可以在後面的文中看到。
    unsubscribe(): 這是 Subscriber 所實作的另一個接口 Subscription 的方法,用于取消訂閱。在這個方法被調用後,Subscriber 将不再接收事件。一般在這個方法調用前,可以使用 isUnsubscribed() 先判斷一下狀态。 unsubscribe() 這個方法很重要,因為在 subscribe() 之後, Observable 會持有 Subscriber 的引用,這個引用如果不能及時被釋放,将有記憶體洩露的風險。是以最好保持一個原則:要在不再使用的時候盡快在合适的地方(例如 onPause() onStop() 等方法中)調用 unsubscribe() 來解除引用關系,以避免記憶體洩露的發生。
) 建立 Observable
        Observable 即被觀察者,它決定什麼時候觸發事件以及觸發怎樣的事件。 RxJava 使用 create() 方法來建立一個 Observable ,并為它定義事件觸發規則:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello");
                subscriber.onNext("Hi");
                subscriber.onNext("Aloha");
                subscriber.onCompleted();
            }
        });
Observable observable = Observable.just("Hello","Hi","Aloha"); 将傳入的參數依次發送出來。
List<String> list = new ArrayList();
list.add( "1" ) ;
list.add( "2" ) ;
list.add( "3" ) ;
Observable observable = Observable.from(list);
from(T[]) / from(Iterable<? extends T>) : 将傳入的數組或 Iterable 拆分成具體對象後,依次發送出來。
    3) Subscribe (訂閱)
    建立了 Observable 和 Observer 之後,再用 subscribe() 方法将它們聯結起來,整條鍊子就可以工作了。代碼形式很簡單:
    observable.subscribe(observer);
    // 或者:
    observable.subscribe(subscriber);
           

3 線程控制 —— Scheduler (一)

在RxJava 中,Scheduler ——排程器,相當于線程控制器,RxJava 通過它來指定每一段代碼應該運作在什麼樣的線程。RxJava 已經内置了幾個 Scheduler ,它們已經适合大多數的使用場景:

Schedulers.immediate(): 直接在目前線程運作,相當于不指定線程。這是預設的 Scheduler。

Schedulers.newThread(): 總是啟用新線程,并在新線程執行操作。

Schedulers.io(): I/O 操作(讀寫檔案、讀寫資料庫、網絡資訊互動等)所使用的 Scheduler。行為模式和 newThread() 差不多,差別在于 io() 的内部實作是是用一個無數量上限的線程池,可以重用空閑的線程,是以多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免建立不必要的線程。

Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。

AndroidSchedulers.mainThread(),它指定的操作将在 Android 主線程運作。

有了這幾個 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 兩個方法來對線程進行控制了。 * subscribeOn(): 指定 subscribe() 所發生的線程,即 Observable.OnSubscribe 被激活時所處的線程。或者叫做事件産生的線程。 * observeOn(): 指定 Subscriber 所運作在的線程。或者叫做事件消費的線程。

Observable.just(, , , )
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.e(tag, "number:" + number);
        }
    });
           

4 變換

所謂變換,就是将事件序列中的對象或整個序列進行加工處理,轉換成不同的事件或事件序列

map(): 事件對象的直接變換,它是 RxJava 最常用的變換。

map()使用
Observable.create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.e( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })
                .observeOn(Schedulers.newThread())    //新線程
                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Log.e( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.e( "rx_subscribe" , Thread.currentThread().getName());
                    }
                }) ;
flatMap(): 這是一個很有用但非常難了解的變換,flatMap() 中傳回的是個 Observable 對象,并且這個 Observable 對象并不是被直接發送到了 Subscriber 的回調方法中
    flatMap() 的原理是這樣的: 使用傳入的事件對象建立一個 Observable 對象; 并不發送這個 Observable, 而是将它激活,于是它開始發送事件; 每一個建立出來的 Observable 發送的事件,都被彙入同一個 Observable ,而這個 Observable 負責将這些事件統一交給 Subscriber 的回調方法
   flatMap()使用  
   Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
};
Observable.from(students)
    .flatMap(new Func1<Student, Observable<Course>>() {
        @Override
        public Observable<Course> call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);
    ) 變換的原理:lift()
    在 Observable 執行了 lift(Operator) 方法之後,會傳回一個新的 Observable,這個新的 Observable 會像一個代理一樣,負責接收原始的 Observable 發出的事件,并在處理後發送給 Subscriber。
    ) compose: 對 Observable 整體的變換
    它和 lift() 的差別在于, lift() 是針對事件項和事件序列的,而 compose() 是針對 Observable 自身進行變換
Observable.just("123")
       .compose(RxUtil.<String>applySchedulers())
       .subscribe(new Action1<String>() {
           @Override
           public void call(String s) {

               Log.e("call---subscribe ",Thread.currentThread().getName() +";内容="+s);
           }
       });
           

5 線程控制:Scheduler (二)

操作符(Operators):操作符就是為了解決對Observable對象的變換的問題
    Observable和Subscriber可以做任何事情
    Observable可以是一個資料庫查詢,Subscriber用來顯示查詢結果;Observable可以是螢幕上的點選事件,Subscriber用來響應點選事件;Observable可以是一個網絡請求,Subscriber用來顯示請求結果。

    Observable和Subscriber是獨立于中間的變換過程的。
    在Observable和Subscriber中間可以增減任何數量的map。整個系統是高度可組合的,操作資料是一個很簡單的過程。
    filter()輸出和輸入相同的元素,并且會過濾掉那些不滿足檢查條件的。
    take()輸出最多指定數量的結果
    doOnNext()允許我們在每次輸出一個元素之前做一些額外的事情,比如這裡的儲存标題

    subscribeOn(): 指定 subscribe() 所發生的線程,即 Observable.OnSubscribe 被激活時所處的線程。或者叫做事件産生的線程。
    observeOn(): 指定 Subscriber 所運作在的線程。或者叫做事件消費的線程。

    使用Observable.fromCallable()方法有兩點好處:

    擷取要發送的資料的代碼隻會在有Observer訂閱之後執行。
    擷取資料的代碼可以在子線程中執行。

    zip  操作符,合并多個觀察對象的資料。并且允許 Func2()函數重新發送合并後的資料

    scan累加器操作符的使用

    filter 過濾操作符的使用

    take :取前n個資料
    takeLast:取後n個資料
    first 隻發送第一個資料
    last 隻發送最後一個資料
    skip() 跳過前n個資料發送後面的資料
    skipLast() 跳過最後n個資料,發送前面的資料
    elementAt 、elementAtOrDefault
    elementAt() 發送資料序列中第n個資料 ,序列号從開始
    如果該序号大于資料序列中的最大序列号,則會抛出異常,程式崩潰
    是以在用elementAt操作符的時候,要注意判斷發送的資料序列号是否越界
    elementAtOrDefault( int n , Object default ) 發送資料序列中第n個資料 ,序列号從開始。
    如果序列中沒有該序列号,則發送預設值

    startWith() 插入資料

    delay操作符,延遲資料發送

    Timer  延時操作符的使用

    interval 輪詢操作符,循環發送資料,資料從開始遞增

    doOnNext() 操作符,在每次 OnNext() 方法被調用前執行
    使用場景:從網絡請求資料,在資料被展示前,緩存到本地

    、Buffer 操作符
    Buffer( int n )      把n個資料打成一個list包,然後再次發送。
    Buffer( int n , int skip)   把n個資料打成一個list包,然後跳過第skip個資料。

    throttleFirst 操作符
    使用場景:、button按鈕防抖操作,防連續點選   、百度關鍵詞聯想,在一段時間内隻聯想一次,防止頻繁請求伺服器   

    distinct    過濾重複的資料

    distinctUntilChanged()  過濾連續重複的資料

    debounce() 操作符
    一段時間内沒有變化,就會發送一個資料。

    doOnSubscribe() 
    使用場景: 可以在事件發出之前做一些初始化的工作,比如彈出進度條等等

    range 操作符的使用 

    defer 操作符

    just操作符是在建立Observable就進行了指派操作,而defer是在訂閱者訂閱時才建立Observable,此時才進行真正的指派操作。

    .subscribeOn(Schedulers.io())  和 .observeOn(AndroidSchedulers.mainThread()) 寫的位置不一樣,造成的結果也不一樣。從例中可以看出 map() 操作符預設運作在事件産生的線程之中。事件消費隻是在 subscribe() 裡面。
    對于 create() , just() , from()   等                 --- 事件産生   
    map() , flapMap() , scan() , filter()  等    --  事件加工

    subscribe()                                          --  事件消費

    事件産生:預設運作在目前線程,可以由 subscribeOn()  自定義線程
    事件加工:預設跟事件産生的線程保持一緻, 可以由 observeOn() 自定義線程

    事件消費:預設運作在目前線程,可以有observeOn() 自定義

    如果隻規定了事件産生的線程,那麼事件消費線程将跟随事件産生線程。

   如果隻規定了事件消費的線程,那麼事件産生的線程和 目前線程保持一緻
           

6 Retrofit的使用就是以下幾步:

定義接口,參數聲明,Url都通過Annotation指定

通過RestAdapter生成一個接口的實作類(動态代理)

調用接口請求資料

建立Retrofit 執行個體:

public static final String BASE_URL = "http://api.myservice.com";
    Retrofit retrofit = new Retrofit.Builder()
        .baseUrl(BASE_URL)
        .addConverterFactory(GsonConverterFactory.create())
        .build();

定義 Endpoints:interface 用下面的方式定義了每個endpoint 
    public interface MyApiEndpointInterface {
        // Request method and URL specified in the annotation
        // Callback for the parsed response is the last parameter

        @GET("/users/{username}")
        Call<User> getUser(@Path("username") String username);

        @GET("/group/{id}/users")
        Call<List<User>> groupList(@Path("id") int groupId, @Query("sort") String sort);

        @POST("/users/new")
        Call<User> createUser(@Body User user);
    }   
Multipart forms:
    如果你要送出多參數表單資料(multi-part form data),可以使用@Multipart與@Part注解:
    String username = "sarahjean";
    Call<User> call = apiService.getUser(username);
    call.enqueue(new Callback<User>() {
        @Override
        public void onResponse(Response<User> response) {
            int statusCode = response.code();
            User user = response.body();  
        }

        @Override
        public void onFailure(Throwable t) {
            // Log error here since request failed
        }
    });