天天看点

RxJava操作符之合并操作符

前面介绍了rxjava 错误处理,Do 系列操作符,今天继续介绍合并操作符,merge、concat、zip、startWith 。下面就以代码和日志,加上官网介绍来详细介绍。

话不多说,先上公共的代码

Observable<String> mStringObservable;

    Observable<String> mStringSecondObservable;

    Observable<Integer> mIntegerObservable;

    Observable<String> mStringErorObservable;

    Observer<String> mStringSubscriber;

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_important_operation);

        mStringErorObservable = Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                for (int i = 0; i < 5; i++) {
                    if(i == 3) {
                        emitter.onError(new Throwable("ERROR"));
                    }else{
                        emitter.onNext(i + "");
                    }
                }
                emitter.onComplete();
            }
        });

        mStringObservable = Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                for (int i = 0; i < 5; i++) {
                    emitter.onNext(i + "");
                }
                emitter.onComplete();
            }
        });

        mStringSecondObservable = Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                for (int i = 0; i < 5; i++) {
                    emitter.onNext(i + " StringSecond");
                }
                emitter.onComplete();
            }
        });

        mStringSubscriber = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        };

        mIntegerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 6; i++) {
                    emitter.onNext(i);
                }
                emitter.onComplete();
            }
        });
    }
           

Merge

combine multiple Observables into one by merging their emissions

Merge may interleave the items emitted by the merged Observables (a similar operator, Concat, does not interleave items, but emits all of each source Observable’s items in turn before beginning to emit items from the next source Observable).

RxJava操作符之合并操作符

合并同类型的数据源(有可能会交错数据源,只是有这个可能,并不一定会交错),注意 :不同类型的数据源无法合并。对于这一规则,concat 、startWith都必须遵守。可以看到下面两个不同类型的数据源无法应用merge。

RxJava操作符之合并操作符

代码展示

public void merge(View view){
        mStringSecondObservable.mergeWith(mStringObservable).subscribe(mStringSubscriber);
    }
           

打印结果:

D/OtherImportantOperation: onNext: 0 StringSecond
D/OtherImportantOperation: onNext: 1 StringSecond
D/OtherImportantOperation: onNext: 2 StringSecond
D/OtherImportantOperation: onNext: 3 StringSecond
D/OtherImportantOperation: onNext: 4 StringSecond
D/OtherImportantOperation: onNext: 0
D/OtherImportantOperation: onNext: 1
D/OtherImportantOperation: onNext: 2
D/OtherImportantOperation: onNext: 3
D/OtherImportantOperation: onNext: 4
D/OtherImportantOperation: onComplete: 
           

结果分析:

打印了两种数据源,起到了合并的作用。

Concat 

emit the emissions from two or more Observables without interleaving them

RxJava操作符之合并操作符

RxJava implements this operator as 

concat

. There are variants of this operator that take between two and nine Observables as parameters, and that concatenate them in the order they appear in the parameter list. There is also a variant that takes as a parameter an Observable of Observables, and concatenates each of these Observables in the order that they are emitted.

按顺序合并数据源,并一一发射他们。功能和merge类似,就是它一定不会交错数据源。

代码展示:

public void concat(View view){
        mStringSecondObservable.mergeWith(mStringObservable).subscribe(mStringSubscriber);
    }
           

打印日志:

D/OtherImportantOperation: onSubscribe: 
D/OtherImportantOperation: onNext: 0 StringSecond
D/OtherImportantOperation: onNext: 1 StringSecond
D/OtherImportantOperation: onNext: 2 StringSecond
D/OtherImportantOperation: onNext: 3 StringSecond
D/OtherImportantOperation: onNext: 4 StringSecond
D/OtherImportantOperation: onNext: 0
D/OtherImportantOperation: onNext: 1
D/OtherImportantOperation: onNext: 2
D/OtherImportantOperation: onNext: 3
D/OtherImportantOperation: onNext: 4
D/OtherImportantOperation: onComplete: 
           

结果分析:

和预想一致,没有进行数据交错。

startWith

emit a specified sequence of items before beginning to emit the items from the source Observable

在发射的数据源前面加一个数据,合并到之前的数据源。

RxJava操作符之合并操作符

其效果和concat 、merge 很多时候可以等价,这里就仅展示其用法:

public void startWith(View view){
        mStringObservable.startWith(mStringSecondObservable).subscribe(mStringSubscriber);
    }
           

Zip 操作

把多个不同类型的数据源合并为一个数据源进行发射。

RxJava操作符之合并操作符

运用场景:假如某个设备展示界面需要展示设备和通道信息,而服务端只给你提供了单独获取各自的接口,这时候你需要同时获取这些信息并展示出来。这时候,zip操作符就很有用了,因为通道和设备并不是从属关系,不是同一类型,无法用上面的合并操作,这时候就可以用zip这种合并不同类型数据源的操作符。

解决思路:我们建一个deviceAndChannle 类,包含device 和 channel 两个对象,然后获取到设备和通道信息后,通过zip 操作把他们组装成deviceAndChannle 发射给观察者,后面就可以在onNext 中处理device 和channel 了。

嵌套网络请求处理

场景:假如有2个请求A 和 B,只有A请求成功后B才能进行请求,那么我们如何使用rxJava完成这样的操作呢?

操作符flatMap:

@Override
    public Observable<List<AmortizedBillModel>> useCoupon(final CouponRequestModel couponRequestModel) {
        final SaleCardApi saleCardApi = HttpClient.getInstance().create(SaleCardApi.class);

        return saleCardApi.userCoupon(couponRequestModel)
//                .filter(new Func1<BaseResp<Object>, Boolean>() {
//            @Override
//            public Boolean call(BaseResp<Object> objectBaseResp) {
//                Object object = objectBaseResp.getData();
//
//                return objectBaseResp.getData() != null;
//            }
//        })
                .flatMap(new Func1<BaseResp<Object>, Observable<List<AmortizedBillModel>>>() {
            @Override
            public Observable<List<AmortizedBillModel>> call(BaseResp<Object> objectBaseResp) {
                Object object = objectBaseResp.getData();
                if (object != null){
                    return saleCardApi.getBillPrGenerateRespAfterUseCoupon(couponRequestModel.product_id, couponRequestModel.coupon_id + "", couponRequestModel.bill_id).map(new Func1<BaseResp<BillPreGeneratedResponse>, List<AmortizedBillModel>>() {
                        @Override
                        public List<AmortizedBillModel> call(BaseResp<BillPreGeneratedResponse> billPreGeneratedResponseBaseResp) {
                            return billPreGeneratedResponseBaseResp.getData().getAmortizedBillModelList();
                        }
                    });
                }else {
                    LogUtil.debugLog("objectBaseResp:" + objectBaseResp.getMessage());
                    return Observable.error(new RuntimeException(objectBaseResp.getMessage()));
                }


            }
        });
    }
           

这里先使用优惠券,然后判断是否使用成功,成功后进行下一个请求获取账单,失败则发送一个错误;这就是处理思路。注意几点:我们在使用flatMap 时,要先进行A请求,在进行B请求,并对A的结果进行判断;比如上面的请求就是如果获取的data为空,则认为失败,并用Observale 创建一个error的 数据流,并把错误信息放进去。这时候,rxJava 会自动到subcribe 的onError 方法中,并在主线程中拿到error 信息并toast。这样我们的数据并没有断掉,达到了一个完整的rxJava 链式调用。

如果说:我们想在第一个请求完成后,不论成功与否都进行toast,则可以把第一个请求转换到主线程,然后再把第二个线程在转换到io 线程,如下:

mBooleanObservable = Observable.fromArray(false)
 
public void flatMap(View view){
        // 如果想用flatMap 实现 网络请求遵循一定的逻辑,比如A 网络请求成功后,在去请求B,这时候一般要结合
        // filter 操作符;下面的例子是 mBooleanObservable 是进行了A 网络请求得到的结果,进行filter条件过滤
        // 通过后在执行 B mIntegerObservable2;这时候在通过 flatMap 进行数据流类型转换;
        mBooleanObservable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
//                .filter(new Predicate<Boolean>() {
//            @Override
//            public boolean test(Boolean aBoolean) throws Exception {
//                return aBoolean;
//            }
//        })
                .doOnNext(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                Log.d("flatMap", "doOnNext" + aBoolean);
            }
        })
                //再次切换到io 线程
                .subscribeOn(Schedulers.io())

                .flatMap(new Function<Boolean, ObservableSource<Integer>>() {
            @Override
            public ObservableSource<Integer> apply(Boolean aBoolean) throws Exception {
                if (aBoolean){
                    return mIntegerObservable2;
                }else {
                    return Observable.error(new RuntimeException("发生了错误"));
                }

            }
        })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.d("flatMap", "flatMap" + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d("flatMap", "flatMap" + e.getMessage());
                e.printStackTrace();
            }

            @Override
            public void onComplete() {
                Log.d("flatMap", "flatMap is over");
            }
        });
           

看上面的代码, 先进行第一个网络请求,并对结果处理,然后再次转换到io 线程,并请求下一个结果。

filter  :通过filter 进行结果过滤,这样可以把不符合对结果过滤掉,只处理符合要求的对象。比如说:我们不关心第一个请求的错误原因就可以用filter 进行过滤。

其他替代方案?开一个线程,然后在此线程中依次进行A 、B网络请求;例如,我们在进行对设备进行操作时。

flatMap 的真正的含义 官方文档 :

flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。

Map 得到的不再是一个 observable 对象,flatMap 得到的仍然是一个obserable对象;这是最最关键的区别,你就从这点出来,如果需要得到具体使用的,一般是map;需要再次处理的用flatMap;map 和FlatMap 都属于transform 操作符,都有转化对象的能力;

同时,flatMap 的核心能力是把一个数据流对象进行拆解为多个数据流对象,相当于是把数据流给铺平,然后在分别发送每个数据流,这就是flatMap 的本质。

上面所讲的flatMap 解决网络嵌套问题,只是一个特例而已。当第一个数据流内部没有多个数据流时,铺平的操作就是进行了一次数据流转化,然后在进行后续的操作,由于这个过程是顺序的,所以就解决了请求嵌套的问题。

看一下rxJava的flatMap的官方解释:// 参考的 文章

FlatMap,英文水平有限,如果翻译有不当之处,欢迎各种建议。

1、transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable。

翻译过来就是:由一个Observable发射一组任务,来将它们转成多个Observable。然后,铺平那些Observable到一个Observable里面。听起来是不是很难理解?因为这是硬翻译,来看看软翻译怎么说:

由一个Observable来触发执行,一组转换任务。做法是:先将这些任务以及它们的子任务,提出来。然后,再将这些任务合并到一个Observable里面。最后,由这个Observable,对这些任务进行遍历处理。。。如果还看不懂,没关系,下面还有更详细的解释。

2、The FlatMap operator transforms an Observable by applying a function that you specify to each item emitted by the source Observable, where that function returns an Observable that itself emits items. FlatMap then merges the emissions of these resulting Observables, emitting these merged results as its own sequence.

This method is useful, for example, when you have an Observable that emits a series of items that themselves have Observable members or are in other ways transformable into Observables, so that you can create a new Observable that emits the complete collection of items emitted by the sub-Observables of these items.

Note that FlatMap merges the emissions of these Observables, so that they may interleave.

In several of the language-specific implementations there is also an operator that does not interleave the emissions from the transformed Observables, but instead emits these emissions in strict order, often called ConcatMap or something similar.

FlatMap操作符,转换一个Observable的做法是,对每一个任务,都通过调用SourceObservable的方法来实现转换。这个方法由你实现,具体的转换逻辑,就在这个方法里面处理。并且,该方法会返回一个Observable,这个Observable又可以处理它自己的任务。FlatMap会合并所有的任务,即将一级任务,先全部转成二级任务,再遍历处理。

这个方法是很有用的。比如说,当你的Observable执行一组任务,或处理一组数据的同时。这些任务或数据,又包含有自己的任务,或数据。所以,你可以为每一个任务,创建一个Observable来处理,它们的二级任务。

 Rxjava 合并操作符缺陷

假如说:你需要获取两个数据源,但是这两个数据源没有继承关系,不是同一类型,

 ObservableSource<? > source1, ObservableSource<? > source2 ;这个时候,我们想要使用merge 、contact 就无法使用?

因为rxJava 规定数据源要是如下关系:

  • ObservableSource<? extends T> source1, ObservableSource<? extends T> source2 ,它们的基类是T ,才可以使用。

这一点对于concat 、 merge 操作符都是如此,那我们该如何办呢? 遇到不同的数据源时候,又不想写多个rxJava 请求,想执行一次线程切换即可。

很遗憾,没有太优雅的办法,只能让Observable 的泛型改成 Object 这种不是很好的办法,并在map的时候把每个observable 的数据存入本地,等到用的时候再去取;比如

接口定义
    
    /**
     * 获取关于我们数据
     * @return
     */
    Observable<Object> getAboutUsData();


    /**
     * 解析 本地json文件
     * @return
     */
    Observable<Object> getOJKEventRecords();

实现

   @Override
    public Observable<Object> getAboutusData() {

        AboutUsApi aboutUsApi = HttpClient.getInstance().create(AboutUsApi.class);
        return aboutUsApi.getAboutUsInfo().map(new Func1<BaseResp<AboutUsModel>, Object>() {
            @Override
            public AboutUsModel call(BaseResp<AboutUsModel> aboutUsModelBaseResp) {
                AboutUsModel aboutUsModel = aboutUsModelBaseResp.getData();
                
   // 重点这里把数据存入本地 等用到的时候再去取
SharePreferenceUtil.getInstance(DanaCepatApp.getInstance().getApplicationContext())
                        .setObject(SP_constant.HELP_ABOUT_US_MODEL, aboutUsModel);
                return aboutUsModel;
            }
        });

    }


    @Override
    public Observable<Object> getOJKEventRecords() {
        String json = getJson("ojkEventRecords.json", 
 App.getInstance().getApplicationContext());
        LogUtil.debugLog("json" + json);
        try {
            String str = new JSONObject(json).getString("ojkEventRecords");
            Gson gson = new Gson();
            Type type = new TypeToken<List<OJKEventRecordModel>>(){}.getType();
            List<OJKEventRecordModel> list = gson.fromJson(str, type);
            if (list != null) {
                 // 重点这里把数据存入本地 等用到的时候再去取SharePreferenceUtil.getInstance(DanaCepatApp.getInstance().getApplicationContext())
                        .setObject(OJK_EVENTSRECORDS, list);
            }
            return Observable.just(list).map(new Func1<List<OJKEventRecordModel>, Object>() {
                @Override
                public Object call(List<OJKEventRecordModel> ojkEventRecordModels) {
                    return ojkEventRecordModels;
                }
            });
        } catch (JSONException e) {
            e.printStackTrace();
        }
        return null;
    }


    public String getJson(String fileName, Context context) {
        //将json数据变成字符串
        StringBuilder stringBuilder = new StringBuilder();
        try {
            //获取assets资源管理器
            AssetManager assetManager = context.getAssets();
            //通过管理器打开文件并读取
            BufferedReader bf = new BufferedReader(new InputStreamReader(
                    assetManager.open(fileName)));
            String line;
            while ((line = bf.readLine()) != null) {
                stringBuilder.append(line);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return stringBuilder.toString();
    }
           

   // 重点这里把数据存入本地 等用到的时候再去取

SharePreferenceUtil.getInstance(DanaCepatApp.getInstance().getApplicationContext()) .setObject(SP_constant.HELP_ABOUT_US_MODEL, aboutUsModel);

 这里再获取到数据后把数据存入本地,这样就不用把结果抛到主线程中,再主线程中使用了。

ojkEventRecordObservale.mergeWith(aboutUsModelObservable).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new HaduSubscribe<Object>() {

                    @Override
                    public void onError(BaseException e) {
                        dismissBaseLoadingDialog();
                        mHttpFailString = e.getErrorDesc();
                    }

                    @Override
                    public void onCompleted() {
                        super.onCompleted();
                    }

                    @Override
                    public void onNext(Object o) {
                        super.onNext(o);
                        dismissBaseLoadingDialog();

                        mAboutUsModel = SharePreferenceUtil.getInstance(HelpActivity.this)
                                .getObject(SP_constant.HELP_ABOUT_US_MODEL, AboutUsModel.class);

                    }
                });
           

再 subscribe 中的 onNext 中取出之前存入的本地结果,在这里使用。

除了可以在map的时候存入本地再使用这种方式,我们还可以通过doOnNext 监听去实现,比如:

@RxLogObservable
public Observable<Data> getMergedData() {
  return Observable.merge(
    diskRepository.getData().subscribeOn(Schedulers.io()),
    networkRepository.getData()
      .doOnNext(new Action1<Data>() { 
        @Override 
        public void call(Data data) { 
          diskRepository.saveData(data); // <-- save to cache
        } 
      }).subscribeOn(Schedulers.io())
  );
}
           

另外,还有一种方式,新建一个bean ,把不同数据源作为这个bean的 属性,然后用zip 操作符来进行组合。这样其实也不太优雅,因为这几个数据源本来就是没有关联的data,这样强行把它们进行关联,其实不太好;总之,rxJava 再这种情况下,没有很好的处理方式。其实,这个时候应该考虑换线程池或者 AsyncTask 去完成,可能会更好。

参考文章

全面的rxjava https://www.jianshu.com/p/cd984dd5aae8  

rxJava 仍物线