天天看點

什麼是函數響應式程式設計(Java&Android版本)

什麼是函數響應式程式設計(Java&Android版本)

——歡迎轉載,請注明出處 ,未經本人同意請勿用于商業用途,謝謝——

原文連結:​​http://www.bignerdranch.com/blog/what-is-functional-reactive-programming/​​

本文Gitbooks連結:​​http://asce1885.gitbooks.io/android-rd-senior-advanced/content/​​

函數響應式程式設計(FRP)為解決現代程式設計問題提供了全新的視角。一旦了解它,可以極大地簡化你的項目,特别是處理嵌套回調的異步事件,複雜的清單過濾和變換,或者時間相關問題。

我将盡量跳過對函數響應式程式設計學院式的解釋(網絡上已經有很多),并重點從實用的角度幫你了解什麼是函數響應式程式設計,以及工作中怎麼應用它。本文将圍繞函數響應式程式設計的一個具體實作​​RxJava​​,它可用于Java和Android。

開始

我們以一個真實的例子來開始講解函數響應式程式設計怎麼提高我們代碼的可讀性。我們的任務是通過查詢​​GitHub的API​​​,首先擷取使用者清單,然後請求每個使用者的詳細資訊。這個過程包括兩個web 服務端點:

​​​https://api.github.com/users​​​ - 擷取使用者清單;

​​​https://api.github.com/users/​​​{username} -擷取特定使用者的詳細資訊,例如​​https://api.github.com/users/mutexkid​​。

舊的風格

下面例子你可能已經很熟悉了:它調用web service,使用回調接口将成功的結果傳遞給下一個web service請求,同時定義另一個成功回調,然後發起下一個web service請求。你可以看到,這會導緻兩層嵌套的回調:

//The "Nested Callbacks" Way
    public void fetchUserDetails() {
        //first, request the users...
        mService.requestUsers(new Callback<GithubUsersResponse>() {
            @Override
            public void success(final GithubUsersResponse githubUsersResponse,
                                final Response response) {
                Timber.i(TAG, "Request Users request completed");
                final synchronized List<GithubUserDetail> githubUserDetails = new ArrayList<GithubUserDetail>();
                //next, loop over each item in the response
                for (GithubUserDetail githubUserDetail : githubUsersResponse) {
                    //request a detail object for that user
                    mService.requestUserDetails(githubUserDetail.mLogin,
                                                new Callback<GithubUserDetail>() {
                        @Override
                        public void success(GithubUserDetail githubUserDetail,
                                            Response response) {
                            Log.i("User Detail request completed for user : " + githubUserDetail.mLogin);
                            githubUserDetails.add(githubUserDetail);
                            if (githubUserDetails.size() == githubUsersResponse.mGithubUsers.size()) {
                                //we've downloaded'em all - notify all who are interested!
                                mBus.post(new UserDetailsLoadedCompleteEvent(githubUserDetails));
                            }
                        }

                        @Override
                        public void failure(RetrofitError error) {
                            Log.e(TAG, "Request User Detail Failed!!!!", error);
                        }
                    });
                }
            }

            @Override
            public void failure(RetrofitError error) {
                Log.e(TAG, "Request User Failed!!!!", error);
            }
        });
    }      

盡管這不是最差的代碼-至少它是異步的,是以在等待每個請求完成的時候不會阻塞-但由于代碼複雜(增加更多層次的回調代碼複雜度将呈指數級增長)是以遠非理想的代碼。當我們不可避免要修改代碼時(在前面的web service調用中,我們依賴前一次的回調狀态,是以它不适用于子產品化或者修改要傳遞給下一個回調的資料)也遠非容易的工作。我們親切的稱這種情況為​​“回調地獄”​​。

RxJava的方式

下面讓我們看看使用RxJava如何實作相同的功能:

public void rxFetchUserDetails() {
        //request the users
        mService.rxRequestUsers().concatMap(Observable::from)
        .concatMap((GithubUser githubUser) ->
                        //request the details for
        //accumulate them as a list
        .toList()
        //define which threads information will be passed on
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        //post them on an eventbus
        .subscribe(githubUserDetails -> {
            EventBus.getDefault().post(new;
    }      

如你所見,使用函數響應式程式設計模型我們完全擺脫了回調,并最終得到了更短小的程式。讓我們從函數響應式程式設計的基本定義開始慢慢解釋到底發生了什麼,并逐漸了解上面的代碼,這些代碼托管在​​GitHub上面​​。

從根本上講,函數響應式程式設計是在觀察者模式的基礎上,增加對Observables發送的資料流進行操縱和變換的功能。在上面的例子中,Observables是我們的資料流通所在的管道。

回顧一下,​​觀察者模式​​包含兩個角色:一個Observable和一個或者多個Observers。Observable發送事件,而Observer訂閱和接收這些事件。在上面的例子中,.subscribe()函數用于給Observable添加一個Observer,并建立一個請求。

建構一個Observable管道

對Observable管道的每個操作都将傳回一個新的Observable,這個新的Observable的内容要麼和操作前相同,要麼就是經過轉換的。這種方式使得我們可以對任務進行分解,并把事件流分解成小的操作,接着把這些Observables拼接起來進而完成更複雜的行為或者重用管道中的每個獨立的單元。我們對Observable的每個方法調用會被加入到總的管道中以便我們的資料在其中流動。

下面首先讓我們從搭建一個Observable開始,來看一個具體的例子:

Observable<String> sentenceObservable = Observable.from(“this”, “is”, “a”, “sentence”);      

這樣我們就定義好了管道的第一個部分:Observable。在其中流通的資料是一個字元串序列。首先要認識到的是這是沒有實作任何功能的非阻塞代碼,僅僅定義了我們想要完成什麼事情。Observable隻有在我們“訂閱”它之後才會開始工作,也就是說給它注冊一個Observer之後。

Observable.subscribe(new Action1<String>() {
          @Override
          public void call(String s) {
                System.out.println(s);
          }
 });      

到這一步Observable才會發送由每個獨立的Observable的from()函數添加的資料塊。管道會持續發送Observables直到所有Observables都被處理完成。

變換資料流

現在我們得到正在發送的字元串流,我們可以按照需要對這些資料流進行變換,并建立更複雜的行為。

Observable<String> sentenceObservable = Observable.from(“this”, “is”, “a”, “sentence”);

sentenceObservable.map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                return s.toUpperCase() + " ";
            }
        })
.toList()
.map(new Func1<List<String>, String>() {
            @Override
            public String call(List<String> strings) {
                Collections.reverse(strings);
                return strings.toString();
            }
        })
//subscribe to the stream of Observables
.subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        });      

一旦Observable被訂閱了,我們會得到“SENTENCE A IS THIS”。上面調用的​​.map函數​​接受Func1類的對象,該類有兩個範型類型參數:一個是輸入類型(前一個Observable的内容),另一個是輸出類型(在這個例子中,是一個經過大寫轉換,格式化并用一個新的Observable執行個體包裝的字元串,最終被傳遞給下一個函數)。如你所見,我們通過可重用的管道組合實作更複雜的功能。

上面的例子中,我們還可以使用Java8的lambda表達式來進一步簡化代碼:

Observable.just("this", "is", "a", "sentence").map(s -> s.toUpperCase() + " ").toList().map(strings -> {
            Collections.reverse(strings);
            return strings.toString();
        });      

在subscribe函數中,我們傳遞Action1類對象作為參數,并以String類型作為範型參數。這定義了訂閱者的行為,當被觀察者發送最後一個事件後,處理後的字元串就被接收到了。這是.subscribe()函數最簡單的重載形式(參見​​https://github.com/ReactiveX/RxJava/wiki/Observable#establishing-subscribers​​可以看到更複雜的函數重載簽名)。

這個例子展示了變換函數.map()和聚合函數.toList(),在操縱資料流的能力方面這僅僅是冰山一角(所有可用的資料流操作函數可見​​https://github.com/ReactiveX/RxJava/wiki​​),但它顯示了基本概念:在函數響應式程式設計中,我們可以通過實作了資料轉換或者資料操作功能的管道中獨立的單元來轉換資料流。根據需要我們可以在其他由Observables組成的管道複用這些獨立的單元。通過把這些Observable單元拼接在一起,我們可以組成更複雜的特性,但同時保持它們作為易于了解和可修改的可組合邏輯小單元。

使用Scheduler管理線程

在web service例子中,我們展示了如何使用RxJava發起網絡請求。我們談及轉換,聚合和訂閱Observable資料流,但我們沒有談及Observable資料流的web請求是怎樣實作異步的。

這就屬于FRP程式設計模型如何調用Scheduler的範疇了-該政策定義了Observable流事件在哪個線程中發生,以及訂閱者在哪個線程消費Observable的處理結果。在web service例子中,我們希望請求在背景線程中進行,而訂閱行為發生在主線程中,是以我們如下定義:

.subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        //post them on an eventbus
        .subscribe(githubUserDetails -> {
            EventBus.getDefault().post(new UserDetailsLoadedCompleteEvent(githubUserDetails));
        });      

Observable.subscribeOn(Scheduler scheduler)函數指定Observable的工作需要在指定的Scheduler線程中執行。Observable.observeOn(Scheduler scheduler)指定Observable在哪個Scheduler線程觸發訂閱者們的onNext(),onCompleted(),和onError()函數,并調用Observable的observeOn()函數,傳遞正确的Scheduler給它。

下面是可能會用到Scheduler:

  • Schedulers.computation():用于計算型工作例如事件循環和回調處理,不要在I/O中使用這個函數(應該使用Schedulers.io()函數);
  • Schedulers.from(executor):使用指定的Executor作為Scheduler;
  • Schedulers.immediate():在目前線程中立即開始執行任務;
  • Schedulers.io():用于I/O密集型工作例如阻塞I/O的異步操作,這個排程器由一個會随需增長的線程池支援;對于一般的計算工作,使用Schedulers.computation();
  • Schedulers.newThread():為每個工作單元建立一個新的線程;
  • Schedulers.test():用于測試目的,支援單元測試的進階事件;
  • Schedulers.trampoline():在目前線程中的工作放入隊列中排隊,并依次操作。

通過設定observeOn和subscribeOn排程器,我們定義了網絡請求使用哪個線程(Schedulers.newThread())。

下一步

我們已經在本文中涵蓋了很多基礎内容,到這裡你應該對函數響應式程式設計如何工作有了很好的認識。請檢視并了解本文介紹的工程,它托管在​​GitHub上面​​​,閱讀​​RxJava文檔​​​并檢出​​rxjava-koans工程​​,以測試驅動的方式掌握函數響應式程式設計範型。