天天看點

Android之響應式程式設計RxJava/RxAndroid

RxJava 在 GitHub 首頁上的自我介紹是 “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程式的庫)。

特點:簡潔

RJava能辦得到的事情,用handler+Thread或者AsyncTask都可以做到,而使用Rxjava能使異步操作代碼更簡潔。随着程式邏輯變得越來越複雜,它依然能夠保持簡潔。

因為使用一般的異步嵌套,會形成複雜的難易維護的代碼。而RxJava是一條從上到下的鍊式調用,無論多複雜得邏輯都可以用一條線表達,沒有嵌套。

友情提醒:看的過程中一定不要眨眼,看清楚這幾個字眼,很容易亂,不要搞錯。

Observable被觀察者

Observer觀察者

Subscriber觀察者

subscribe訂閱

OnSubscribe被觀察者内部成員

observeOn線程排程方法

subscribeOn線程排程方法

下面開始RxJava的學習:

首先是引入依賴:

compile 'io.reactivex:rxjava:1.0.14' 
compile 'io.reactivex:rxandroid:1.0.1' 
           

RxJava是基于擴充的觀察者模式實作的。

用我們最熟悉的按鈕+監聽器來重制觀察者模式:

①Button —>按鈕,也稱被觀察者

②OnclickListener —>監聽者,也稱觀察者

③setOnClickListener() —>設定監聽者,也稱訂閱

④onClick() —>點選事件,也稱事件

說白了也就是:Button對象通過setOnClickListener(listener)持有OnClickListener對象引用,并且調用OnClickListener對象的回調方法onClick()。那也就是被觀察者調用觀察者的回調方法,實作了被觀察者向觀察者的事件傳遞,這就是觀察者模式。

在RxJava中,也有四個概念:

①Observable —>被觀察者

②Observer/Subscriber —>觀察者

③subscribe() —>訂閱

④onNext()/onError()/onCompleted()—>事件

  • 與傳統觀察者模式不同, RxJava 的事件回調方法除了普通事件 onNext() (相當于 onClick() /

    onEvent())之外,還定義了兩個特殊的事件:onCompleted() 和 onError()。

  • onCompleted(): 事件隊列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。RxJava

    規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為标志。

  • onError(): 事件隊列異常。在事件處理過程中出異常時,onError() 會被觸發,同時隊列自動終止,不允許再有事件發出。

在一個正确運作的事件序列中, onCompleted() 和 onError() 有且隻有一個,并且是事件序列中的最後一個。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在隊列中調用了其中一個,就不應該再調用另一個。

下面開始來講如何實作:

一.觀察者Observer/Subscriber的建立

Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }
    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }
    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};  
           
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }
    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }
    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};
           

Observer是接口,Subscriber是實作了Observer接口的一個抽象類,兩者的使用方式一模一樣,而Subscriber做了一些擴充,而實質上,Observer是會轉成一個Subscriber後再使用的。

Subscriber與Observer的差別(Subscriber多出兩個方法):

①onStart():事件還未發送之前被調用,如果對準備工作的線程有要求, onStart() 就不适用了,因為它總是在 subscribe 所發生的線程被調用,而不能指定線程。要在指定的線程來做準備工作,可以使用 doOnSubscribe() 方法。

②unsubscribe():用于取消訂閱,調用後,Subscriber 将不再接收事件。一般在這個方法調用前,可以使用 isUnsubscribed() 先判斷一下狀态。

二:被觀察者Observable的建立

①create()

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});
           

可以看到這裡建立Observable的時候,傳入了一個OnSubscriber對象,它被生成的Observable所持有,當Observable被訂閱subscribe(subscriber)的時候,會調用OnSubscriber.call(subscriber)方法。也就是說訂閱的時候,subscribe(subscriber)傳入的subscriber對象參數,會從方法OnSubscriber.call(subscriber)傳進去執行。那也就是被觀察者調用觀察者的回調方法,實作了被觀察者向觀察者的事件傳遞,這就是觀察者模式。

②just(T…):将傳入的參數依次發出來。

③from(T[]) 或者from(Iterable):将傳入的數組或者Iterable拆分成具體對象,依次發出。

String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
           

可以看到just,from方法比較簡潔,其實這兩個方法都是基于create()方法的,也就是說,最終還是調用了create()生成的Observable對象。

這三個方法生成的Observable是完完全全等價的,當Observable對象被訂閱subscribe()的時候,将會依次調用:

onNext("Hello");
onNext("Hi");
onNext("Aloha");
onCompleted();
           

其實還有很多其他的生成Obervable的方法,這裡先不列舉。

三:subscribe()訂閱

訂閱代碼非常簡單

observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
           

我們可以看看subscribe(subscriber)的核心代碼:

public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}
           

①subscriber首先會執行onStart()方法,這個在前面有提及到。

②執行OnSubscriber.call(subscriber)。

我們在create()的時候,就知道call調用的是這樣的代碼:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});
           

也就是subscriber會在Observable裡面被回調onNext()事件。

③傳回Subscription ,為了友善unsubscribe()。

其實你會發現,真正的被觀察者不是Observable,而是Observable.OnSubscribe對象。而OnSubscribe是Observable的一個成員對象。

subscribe() 還支援不完整定義的回調:

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

// 自動建立 Subscriber ,并使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動建立 Subscriber ,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動建立 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
           

Action0 是 RxJava 的一個接口,它隻有一個方法 call(),這個方法是無參無傳回值的;

Action1是 RxJava 的一個接口,它隻有一個方法 call(T param),這個方法也無傳回值,但有一個參數;

由于 onCompleted() 方法是無參無傳回值的,是以 可以用Action0 可以包裝。

由于 onNext(T obj) 和 onError(Throwable error) 也是單參數無傳回值的,是以可以用Action1包裝。

這裡給一個真是的例子

String[] names = ...;
Observable.from(names)
    .subscribe(new Action1<String>() {
        @Override
        public void call(String name) {
            Log.d(tag, name);
        }
    });
           

非常簡單,會依次列印出這個names數組。

四:Scheduler線程排程器

在 RxJava 的預設規則中,事件的發出和消費都是在同一個線程的。而觀察者模式本身的目的就是『背景處理,前台回調』的異步機制。

RxJava内置的Scheduler:

①Scheduler.immediate() :目前線程,相當于不指定線程。(預設)

②Scheduler.newThread():啟動新線程。

③Scheduler.io():I/O操作所使用的scheduler。内部實作用了一個無上限的線程池,會重用空閑線程,比newThread()效率高。

④Scheduler.computation():CPU密集計算使用的scheduler。

⑤AndroidSchedulers.mainThread():Android主線程scheduler。

使用subscriberOn()和observeOn對線程控制:

1.subscribeOn()

指定 subscribe() 所發生的線程,即 Observable.OnSubscribe 被激活時所處的線程(因為發生subscribe()訂閱的時候,會調用Observable.OnSubscribe的call方法)。或者叫做事件産生的線程。

2.observeOn()

指定 Subscriber 所運作的線程。或者叫做事件消費的線程。并且observeOn() 指定的是它之後的操作所在的線程。

不同于 observeOn() , subscribeOn() 的位置放在哪裡都可以,但它是隻能調用一次的。如果多次調用 subscribeOn() ,隻有第一次起作用,後面的都無效。observeOn()可以多次調用。

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
.subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});
           

RxJava多次切換線程栗子:

Observable.just(, , , ) // IO 線程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新線程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 線程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread) 
    .subscribe(subscriber);  // Android 主線程,由 observeOn() 指定
           

至于為什麼可以多次切換線程,在後面講解了lift()變換原理之後,再做解釋。

這樣就實作了:加載圖檔将會發生在 IO 線程,而設定圖檔則被設定在了主線程。

五:變換

将事件序列中的對象或整個序列進行加工處理,轉換成不同的事件或事件序列。

①map(一對一的轉換)

Observable.just("images/logo.png") // 輸入類型 String
    .map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String filePath) { // 參數類型 String
            return getBitmapFromPath(filePath); // 傳回類型 Bitmap
        }
    })
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) { // 參數類型 Bitmap
            showBitmap(bitmap);
        }
    });
           

Func1 和 Action1 非常相似,是 RxJava 的一個接口,用于包裝含有一個參數的方法。 Func1 和 Action 的差別在于, Func1 包裝的是有傳回值的方法。

map() 方法将參數中的 String 對象轉換成一個 Bitmap 對象後傳回,而在經過 map() 方法後,事件的參數類型也由 String 轉為了 Bitmap。

相當于:

Observable.just(“images/logo.png”)

轉換成

Observable.just(bitmap)

②flatMap(一對多的轉換)

Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
    .flatMap(new Func1<Student, Observable<Course>>() {
        @Override
        public Observable<Course> call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);
           

flatMap() 的原理是這樣的:

1. 使用傳入的事件對象建立一個 Observable 對象;

2. 并不發送這個 Observable, 而是将它激活,于是它開始發送事件;

3. 每一個建立出來的 Observable 發送的事件,都被彙入同一個 Observable ,而這個 Observable 負責将這些事件統一交給 Subscriber 的回調方法。

對于上面的例子,目的是把每一個學生的課程列印出來(一個學生有多個課程)。

可以看到flatMap方法使用傳入Student事件對象,建立并傳回Observable對象。多次執行flatMap後會傳回很多個Observable,最後每一個Observable都會被彙入到同一個新的Observable中,而這個 新的Observable 負責将這些事件統一交給 Subscriber 的回調方法;

…剩下的很多常用的操作符,以後再做補充…

傳統嵌套網絡請求,用rxjava可以鍊式表達:rxjava+retrofit

networkClient.token() // 傳回 Observable<String>,在訂閱時請求 token,并在響應後發送 token
    .flatMap(new Func1<String, Observable<Messages>>() {
        @Override
        public Observable<Messages> call(String token) {
            // 傳回 Observable<Messages>,在訂閱時請求消息清單,并在響應後發送請求到的消息清單
            return networkClient.messages();
        }
    })
    .subscribe(new Action1<Messages>() {
        @Override
        public void call(Messages messages) {
            // 處理顯示消息清單
            showMessages(messages);
        }
    });
           

六:變換原理 lift()

這些變換雖然功能各有不同,但實質上都是針對事件序列的處理和再發送。類型轉換都是通過lift()這個方法來實作的。

首先看lift()的核心源碼

public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);//傳進來的subscriber會被進行一輪包裝(變換)成newSubscriber 
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber);//這個onSubscribe指的是原始Observable内的那個onSubscribe,不屬于目前正在建立的這個Observable。
        }
    });
}
           

可以看到lift()方法,傳入一個Operator參數,傳回一個Observable對象;這個傳回的Observable對象,是通過create生成的。

原始有一個Observable和一個OnSubscriber,經過lift()以後,又産生了一個新的Observable和一個新的OnSubscriber。那麼新的Observable被訂閱的時候,就會觸發新的OnSubscriber的call()方法,也就是上面代碼那一個call方法。在call方法裡面傳入的subscriber會被變換包裝成新的subscriber,緊接着先調用新的subscriber的onStart()方法,接着調用原始onSubscribe的call方法,在裡面正式調用新subscriber的回調方法。也就是說,新的Observable僅僅起一個代理的功能,最終的事件分發,還是在原始Observable中。(隻不過,在開發中,我們完全可以了解一個Observable被lift()後依舊是一個簡單的僅僅事件類型變換了的Observable)

舉一個栗子:

将事件中的 Integer 對象轉換成 String

observable.lift(new Observable.Operator<String, Integer>() {
    @Override
    public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
        // 将事件序列中的 Integer 對象轉換為 String 對象
        return new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                subscriber.onNext("" + integer);
            }
            @Override
            public void onCompleted() {
                subscriber.onCompleted();
            }
            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }
        };
    }
});
           

以上代碼中Operator的call方法的調用,是在lift()調用産生新的Observable被訂閱的時候,觸發新的OnSubscriber.call(subscriber)方法裡,對subscriber進行變換和包裝的方法。

這一段代碼的原意是要把Observable<Integer>轉換成Observable<String>,表面上我們看到的,也以為是Observable<Integer>被轉換成了Observable<String>,以為裡面的資料類型從Integer變成了String。

真相是:
原始Observable<Integer>沒變,然後生成了新的Observable<String>并傳回,我們手中持有的是新的這個Observable<String>,而這個新的Observable<String>能夠接收String類型的subscriber<String>的訂閱,而訂閱觸發的新的OnSubscriber調用call方法,在裡面對subscriber<String>進行轉換包裝成subscriber<Integer>,接着用原始的OnSubscriber調用call,把這個包裝後的subscriber<Integer>進行事件分發(調用onNext()..onCompleted()),因為原始Observable<Integer>是Integer類型,它能夠處理這個類型的subscriber<Integer>。那麼在我們眼中,就好像是把事件參數類型Integer轉變成了String,而事實是subscriber的類型做了變換。這些新的Observable其實都起了一個代理的作用,subscriber僅僅是經過了他們,最後都會被傳到最原始的observable裡面,被它執行(回調)。這就給了我們一個假象。
           

七:線程多次切換原理

RaJava是可以多次切換線程的,為什麼?

因為observeOn()方法指定的是Subscriber觀察者的線程,然而這個Subscriber并不是subscribe(Subscriber)訂閱傳入的這個Subscriber,而是observeOn執行時那個Observable所對應的Subscriber。(根據lift()變換原理,我們知道,rxjava鍊式每次變換操作後,Observable對象會變化,那麼它持有Subscriber對象也會變化(由下級包裝上傳過來的),是經過轉換包裝的,這個Subscriber也就是通過目前Observable對象的onSubscriber對象調用call(Subscriber)方法傳入的Subscriber對象)

observeOn() 指定的是它之後的操作所在的線程。是以如果有多次切換線程的需求,隻要在每個想要切換線程的位置調用一次 observeOn() 即可。