天天看点

RXJava学习笔记(3)

过滤操作

通过上篇的学习,我们知道了如何去转换一个Observable对象成我们想要的格式,但是我们想直接省去if else而拿到想要的数据,这个怎么做呢?那么我们就在这篇文章里来系统的学习一下吧

  • Debounce操作符
Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i=;i<;i++) {
            subscriber.onNext(i);
            int time=;
            if (i%==) {
                time=;
            }
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        subscriber.onCompleted();
    }
    //过滤掉不足300ms的
}).debounce(, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.d("SampleFilteringActivity", "integer:" + integer);
    }
});
           

运行结果

com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
           

debounce操作符是

在源Observable发射之后,在规定的时间内没有别的结果产生,则把这个结果提交给订阅者。由于最后一条数据没有其他数据覆盖,所以订阅者都可以收到

除了在时间范畴上进行控制,还可以根据函数来限流

Observable.just(, , , , , , , , , ).debounce(new Func1<Integer, Observable<String>>() {
    @Override
    public Observable<String> call(final Integer integer) {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                if (integer%==) {
                    subscriber.onCompleted();
                }
            }
        });
    }
}).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.d("SampleFilteringActivity", "integer:" + integer);
    }
});
           

运行结果

com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
           

只要能被4整除,订阅者就可以收到。最后一个值情况例外

throttleWithTimeout在时间限流上功能跟debounce一致

  • Distinct操作符
Observable.just(, , , , , ).distinct().subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.d("SampleFilteringActivity", "integer:" + integer);
    }
});
           

运行结果

com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
           

功能就是

去重

如果是

distinctUntilChangedObserver

他不是完全过滤,只是连续N个相同的数据,仅仅保留一个,后面的他就不管了

,例如

Observable.just(, , , , , , , , ).distinctUntilChanged().subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.d("SampleFilteringActivity", "integer:" + integer);
    }
});
           

运行结果

com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
           

下面介绍多点功能差不多的操作符:

  • elementAt操作符
  • take操作符
  • first操作符
  • last操作符
  • takeFirst操作符
  • takeLast操作符
  • skip
  • skipLast

    这里简单说明下

    elementAt是只发射第N个元素

    take是只发射前面N个元素

    first是只发射第一个元素,等同于elementAt(0)

    last是只发射最后一个元素

    takeFirst类似于take,也类似于first,也是只发射前面N个元素。他和take的区别在于如果Observable没有发射数据,take会抛出NoSuchElementException异常,而takeFirst不会,他会产生一个没有onNext只有onCompleted的空Observable

    takeLast是只发射最后N个元素

    skip是跳过之前skip个元素,直接返回后面的所有元素

    skipLast是跳过最后的几个元素,直接返回之前的所有元素

  • Filter操作符
Observable.just(, , , , , ).filter(new Func1<Integer, Boolean>() {
    @Override
    public Boolean call(Integer integer) {
        return integer>;
    }
}).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.d("SampleFilteringActivity", "integer:" + integer);
    }
});
           

运行结果

com.renyu.rxdemo D/SampleFilteringActivity: integer:
com.renyu.rxdemo D/SampleFilteringActivity: integer:
           

对源Observable发射出来的数据按照一定条件进行过滤

  • OfType操作符
Observable.just("1", , ).ofType(String.class).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.d("SampleFilteringActivity", s);
    }
});
           

运行结果

com.renyu.rxdemo D/SampleFilteringActivity: 
           

类似于filter但是又不同,

他是按照数据类型进行过滤

,此处就是找出String类型的发射结果

  • single操作符
Observable.just(, , , , , ).single(new Func1<Integer, Boolean>() {
    @Override
    public Boolean call(Integer integer) {
        //取大于4的唯一一个元素,否则抛出异常
        return integer>=;
    }
}).subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }

    @Override
    public void onNext(Integer integer) {
        Log.d("SampleFilteringActivity", "integer:" + integer);
    }
});
           

运行结果

com.renyu.rxdemo D/SampleFilteringActivity: integer:
           

同样是对源Observable发射出的数据进行判断,如果返回的过滤结果数量不是1,他就抛java.lang.IllegalArgumentException: Sequence contains too many elements

  • ignoreElements

    忽略掉所有的返回结果,仅仅保留onError和onCompleted

  • Sample
Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        for (int i=;i<;i++) {
            subscriber.onNext(""+i);
            try {
                Thread.sleep();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}).sample(, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.newThread()).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.d("SampleFilteringActivity", s);
    }
});
           

运行结果

com.renyu.rxdemo D/SampleFilteringActivity: 
com.renyu.rxdemo D/SampleFilteringActivity: 
com.renyu.rxdemo D/SampleFilteringActivity: 
com.renyu.rxdemo D/SampleFilteringActivity: 
           

sample是

按照指定的时间间隔去定时扫描Observable发射出的最后一条数据

,通过下图来说明

RXJava学习笔记(3)

2.2s的时候正好数据2发射完成,4.4s的时候数据5完成,以此类推,最后一条数据9发出之后没有发生改变,直到8.8s被调用显示

主要参考文章

  • 给 Android 开发者的 RxJava 详解 扔物线大神的文章,我的RXJava启蒙文章
  • 木水川的博客 对RXJava操作符做了很详细的说明以及提供详细的示例
  • ReactiveX文档中文翻译 mcxiaoke翻译的RXJava中文文档
  • Android RxJava使用介绍 job_hesc对RXJava操作符介绍以及简单的示例