天天看點

用RxJava處理嵌套請求

用RxJava處理嵌套請求

網際網路應用開發中由于請求網絡資料頻繁,往往後面一個請求的參數是前面一個請求的結果,于是經常需要在前面一個請求的響應中去發送第二個請求,進而造成“請求嵌套”的問題。如果層次比較多,代碼可讀性和效率都是問題。本文首先從感性上介紹下RxJava,然後講解如何通過RxJava中的flatMap操作符來處理“嵌套請求”的問題

内容提要

  • RxJava簡單介紹
  • 嵌套請求舉例
  • 運用flatMap
  • map和flatMap
  • RxJava與Retrofit配合解決嵌套請求

這裡并不打算詳細介紹RxJava的用法和原理,這方面的文章已經很多了。這裡僅僅闡述本人對于RxJava的感性上的了解。先上一個圖:

我們都知道RxJava是基于觀察者模式的,但是和傳統的觀察者模式又有很大的差別。傳統的觀察者模式更注重訂閱和釋出這個動作,而RxJava的重點在于資料的“流動”。

如果我們把RxJava中的Observable看做一個盒子,那麼Observable就是把資料或者事件給裝進了這個易于拿取的盒子裡面,讓訂閱者(或者下一級别的盒子)可以拿到而處理。這樣一來,原來靜态的資料/事件就被流動起來了。

我們知道人類會在河流中建設大壩,其實我們可以把RxJava中的filter/map/merge等Oberservable操作符看做成資料流中的大壩,經過這個操作符的操作後,大壩資料流被過濾被合并被處理,進而靈活的對資料的流動進行管制,讓最終的使用者靈活的拿到。

以上就是我對RxJava的了解,深入的用法和原理大家請自行看網上的文章。

這裡開始進入正題,開始舉一個嵌套請求的例子。

比如我們下列接口:

  1. api/students/getAll (傳入班級的id獲得班級的學生數組,傳回值是list)
  2. api/courses/getAll (傳入Student的id獲得這個學生所上的課程,傳回值是List)

我們最終的目的是要列印班上所有同學分别所上的課程(大學,同班級每個學生選上的課不一樣),按照傳統Volley的做法,代碼大概是這樣子(Volley已經被封裝過)

private void getAllStudents(String id) {
        BaseRequest baseRequest = new BaseRequest();
        baseRequest.setClassId(id);
        String url = AppConfig.SERVER_URL + "api/students/getAll";

        final GsonRequest request = new GsonRequest<>(url, baseRequest, Response.class, new Response.Listener<Response>() {
            @Override
            public void onResponse(Response response) {
                if (response.getStatus() > 0) {
                    List<Student> studentList = response.getData();
                    for (Student student : studentList) {

                    }
                } else {
                    //error
                }
            }
        }, new Response.ErrorListener() {
            @Override
            public void onErrorResponse(VolleyError error) {
                //error
            }
        });
        MyVolley.startRequest(request);
    }

    private void getAllCourses(String id) {
        BaseRequest baseRequest = new BaseRequest();
        baseRequest.setStudentId(id);
        String url = AppConfig.SERVER_URL + "api/courses/getAll";

        final GsonRequest request = new GsonRequest<>(url, baseRequest, Response.class, new Response.Listener<Response>() {
            @Override
            public void onResponse(Response response) {
                if (response.getStatus() > 0) {
                    List<Course> courseList = response.getData();
                    for (Course course : courseList) {
                        //use
                    }
                } else {
                    //error
                }
            }
        }, new Response.ErrorListener() {
            @Override
            public void onErrorResponse(VolleyError error) {
                //error
            }
        });
        MyVolley.startRequest(request);
    }
           

顯然第一個請求的響應中獲得的資料是一個List,正對每一個List中的item要再次發送第二個請求,在第二個請求中獲得最終的結果。這就是一個嵌套請求。這會有兩個問題:

  • 目前來看并不複雜,如果嵌套層次多了,會造成代碼越來越混亂
  • 寫出來的重複代碼太多

現在我們可以放出RxJava大法了,flatMap是一個Observable的操作符,接受一個Func1閉包,這個閉包的第一個函數是待操作的上一個資料流中的資料類型,第二個是這個flatMap操作完成後傳回的資料類型的被封裝的Observable。說白了就是講一個多級數列“拍扁”成了一個一級數列。

按照上面的列子,flatMap将接受student後然後擷取course的這個二維過程給線性化了,變成了一個可觀測的連續的資料流。

于是代碼是:

ConnectionBase.getApiService2()
                .getStudents(101)
                .flatMap(new Func1<Student, Observable<Course>>() {
                    @Override
                    public Observable<Course> call(Student student) {
                        return ConnectionBase.getApiService2().getAllCourse(student.getId());
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Course>() {
                    @Override
                    public void call(Course course) {
                        //use the Course
                    }
                });
           

是不是代碼簡潔的讓你看不懂了?别急,這裡面的getStutent和getAllCourse是ConnectionBase.getApiService2()的兩個方法,他內建了Retrofit2用來将請求的網絡資料轉化成Observable,最後一節将介紹,這裡先不關注。

我們所要關注的是以上代碼的流程。

首先getStudent傳入了班級id(101)傳回了Observable,然後鍊式調用flatMap操作符對這個Observable進行變換處理,針對每一個發射出來的Student進行再次請求 ConnectionBase.getApiService2().getAllCourse進而傳回Observable,最後對這個 ConnectionBase.getApiService2().getAllCourse進行訂閱,即subscribe方法,再Action1這個閉包的回調中使用course。

flatMap的作用就是對傳入的對象進行處理,傳回下一級所要的對象的Observable包裝。

FuncX和ActionX的差別。FuncX包裝的是有傳回值的方法,用于Observable的變換、組合等等;ActionX用于包裝無傳回值的方法,用于subscribe方法的閉包參數。Func1有兩個入參,前者是原始的參數類型,後者是傳回值類型;而Action1隻有一個入參,就是傳入的被消費的資料類型。

subscribeOn(Schedulers.io()).observeOn(AndroidScheduler.mainThread())是最常用的方式,背景讀取資料,主線程更新界面。subScribeOn指在哪個線程發射資料,observeOn是指在哪裡消費資料。由于最終的Course要重新整理界面,必須要在主線程上調用更新view的方法,是以observeOn(AndroidScheduler.mainThread())是至關重要的。

運用flatMap的地方也是可以用map的,但是是有差別的。先看下map操作符的用法:

ConnectionBase.getApiService2()
                .getStudents(101)
                .map(new Func1<Student>, Course>() {
                    @Override
                    public Course call(Student student) {
                        return conventStudentToCourse();// has problem
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Course>() {
                    @Override
                    public void call(Course course) {
                        //use the Course
                    }
                });
           

可以看到map也是接受一個Func1閉包,但是這個閉包的第二個參數即傳回值參數類型并不是一個被包裝的Observable,而是實際的原始類型,由于call的傳回值是Course,是以conventStudentToCourse這裡就不能用Retrofit2的方式傳回一個Observable了。

是以這裡是有一個問題的,對于這種嵌套的網絡請求,由于接到上端資料流到處理後将結果資料放入下端資料流是一個異步的過程,而conventStudentToCourse這種直接将Student轉化為Course是沒法做到異步的,因為沒有回調方法。那麼這種情況,最好還是用flatMap并通過retrofit的方式來擷取Observable。要知道,Rxjava的一個精髓就是“異步”。

那麼到底map和flatMap有什麼差別,或者說什麼時候使用map什麼時候使用flatMap呢?

flatMap() 和 map() 有一個相同點:它也是把傳入的參數轉化之後傳回另一個對象。但需要注意,和 map() 不同的是, flatMap() 中傳回的是個 Observable 對象,并且這個 Observable 對象并不是被直接發送到了 Subscriber 的回調方法中。

首先,如果你需要将一個類型的對象經過處理(非異步)直接轉化成下一個類型,推薦用map,否則的話就用flatMap。

其次,如果你需要在進行中加入容錯的機制(特别是你自己封裝基于RxJava的網絡請求架構),推薦用flatMap。

比如将一個File[] jsonFile中每個File轉換成String,用map的話代碼為:

Observable.from(jsonFile).map(new Func1<File, String>() {
    @Override public String call(File file) {
        try {
            return new Gson().toJson(new FileReader(file), Object.class);
        } catch (FileNotFoundException e) {
            // So Exception. What to do ?
        }
        return null; // Not good :(
    }
});
           

可以看到這裡在出現錯誤的時候直接抛出異常,這樣的處理其實并不好,特别如果你自己封裝架構,這個異常不大好去抓取。

如果用flatMap,由于flatMap的閉包傳回值是一個Observable,是以我們可以在這個閉包的call中通過Observable.create的方式來建立Observable,而要知道create方法是可以控制資料流下端的Subscriber的,即可以調用onNext/onCompete/onError方法。如果出現異常,我們直接調用subscribe.onError即可,封裝架構也很好感覺。代碼大緻如下:

Observable.from(jsonFile).flatMap(new Func1<File, Observable<String>>() {
    @Override public Observable<String> call(final File file) {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override public void call(Subscriber<? super String> subscriber) {
                try {
                    String json = new Gson().toJson(new FileReader(file), Object.class);

                    subscriber.onNext(json);
                    subscriber.onCompleted();
                } catch (FileNotFoundException e) {
                    subscriber.onError(e);
                }
            }
        });
    }
});
           

這裡該讨論Retrofit了。可以說Retrofit就是為了RxJava而生的。如果你的項目之前在網絡請求架構用的是Volley或者自己封裝Http請求和TCP/IP,而現在你看到了Retrofit這個架構後想使用起來,我可以負責任的跟你說,如果你的項目中沒有使用RxJava的話,使用Retrofit和Volley是沒有差別的!要用Retrofit的話,就最好或者說強烈建議也使用RxJava進行程式設計。

Retrofit有callback和Observable兩種模式,前者就像傳統的Volley一樣,有successs和fail的回調方法,我們在success回調方法中處理結果;而Observable模式是将請求回來的資料由Retrofit架構自動的幫你加了一個盒子,即自動幫你裝配成了含有這個資料的Observable,供你使用RxJava的操作符随意靈活的進行變換。

callback模式的Retrofit是這樣建立的:

retrofit = new Retrofit.Builder()
                .baseUrl(SERVER_URL)
                .addConverterFactory(GsonConverterFactory.create(gson))
                .build();
           

Observable模式是這樣子建立的:

retrofit2 = new Retrofit.Builder()
                .baseUrl(SERVER_URL)
                .addConverterFactory(GsonConverterFactory.create(gson))
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .build();
           

即addCallAdapterFactory這個方法在起作用,在RxJavaCallAdapterFactory的源碼注釋中可以看到這麼一句話:

Response wrapped body (e.g., {@code Observable<Response>}) calls {@code onNext} with a {@link Response} object for all HTTP responses and calls {@code onError} with {@link IOException} for network errors

即它将傳回值body為包裹上了一層“Observable”