天天看點

RxJava操作符系列三(上)

rxjava操作符系列傳送門

rxjava操作符源碼

https://github.com/xiehui999/fuseprogram

<a href="http://mp.weixin.qq.com/s?__biz=mza3mdmymjkzng==&amp;mid=2652262134&amp;idx=1&amp;sn=a3f8f3dc8ea48bb890f2e6c17c25e750&amp;chksm=84dc7161b3abf87755627edf94469ad7e78cbe76bab64baf055a9ac6562fd09d09099738b361&amp;scene=21#wechat_redirect">rxjava操作符系列一</a>

<a href="http://mobile.51cto.com/android-526235.htm">rxjava操作符系列二</a>

RxJava操作符系列三(上)

前言

在之前的文章,我們介紹了一些observable的建立以及資料轉換的操作符,其中的一些資料轉換的操作符了解還是有一定的難度的,但是相信如果敲一遍代碼并且修改各種參數的值,去觀察執行的日志,相信還是很容易的了解的。在官網,每個操作符都給出了圖例,如果你對文字的了解不夠清楚明白,也可以去參考圖示幫助自己了解。在這篇文章中,我們将介紹一些常見的過濾操作符,在rxjava中過濾操作符也是比較好了解的,好了,讓我們一起繼續開啟學習之旅吧。

filter

該操作符接收一個func1參數,我們可以在其中通過運用你自己的判斷條件去判斷我們要過濾的資料,當資料通過判斷條件後傳回true表示發射該項資料,否則就不發射,這樣就過濾出了我們想要的資料。如下,我們過濾出不能被2整除的數

integer[] ints = {1, 2, 3, 4, 5, 6, 7, 8, 9}; 

       observable observable = observable.from(ints).filter(new func1&lt;integer, boolean&gt;() { 

           @override 

           public boolean call(integer integer) { 

               return integer % 2 != 0;//傳回true,就不會過濾掉,會發射資料,過濾掉傳回false的值 

           } 

       }); 

       action1 action1 = new action1&lt;integer&gt;() { 

           public void call(integer i) { 

               log.e(tag, "call: "+i ); 

       }; 

       observable.subscribe(action1);  

輸出日志資訊

call: 1 

call: 3 

call: 5 

call: 7 

call: 9  

oftype

該操作符是filter操作符的一個特殊形式。它過濾一個observable隻傳回指定類型的資料,例如當資料源有字元串和int型資料時,我們想要過濾出字元串就可以使用這個操作符,如下示例代碼

observable.just(0, "one", 6, 4, "two", 8, "three", 1, "four", 0) 

.oftype(string.class) 

.subscribe(new subscriber&lt;string&gt;() { 

@override 

public void oncompleted() { 

log.e(tag, "oncompleted:oftype "); 

public void onerror(throwable e) { 

log.e(tag, "onerror:oftype "); 

public void onnext(string string) { 

log.e(tag, "onnext:oftype " + string); 

});  

onnext:oftype one 

onnext:oftype two 

onnext:oftype three 

onnext:oftype four 

oncompleted:oftype  

當然除了過濾基本類型的資料,也可以過濾自定義類型資料。

first

如果我們隻對observable發射的第一項資料,或者滿足某個條件的第一項資料感興趣,則可以使用first操作符。

observable.just(10, 11, 12, 13).first().subscribe(new action1() { 

            @override 

            public void call(integer integer) { 

                log.e(tag, integer+""); 

            } 

        });  

上面日志隻列印一個值10,當然我們也可以給first傳一個參數fun1,指定一個條件如下

observable.just(10, 11, 12, 13).first(new func1&lt;integer, boolean&gt;() { 

            public boolean call(integer integer) { 

                return integer &gt; 12; 

        }).subscribe(new action1&lt;integer&gt;() { 

此時輸出的資訊就是滿足integer &gt; 12的第一項資料13。

firstordefault

該操作符是first操作符的變形。主要是在沒有發射任何資料時發射一個你在參數中指定的預設值。如下,它有有兩個重載方法。

observable.just(11,12,13).firstordefault(10).subscribe(new action1&lt;object&gt;() { 

            public void call(object o) { 

                log.e(tag, o.tostring()); 

如果寫成上面的代碼,這個執行會和first效果一樣。因為沒有發射資料的時候才用到預設值,那麼我們将上面代碼更改如下,使用empty建立一個不發射任何資料但是正常終止的observable。

observable.empty().firstordefault(10).subscribe(new action1&lt;object&gt;() { 

發現此時輸出了資料10.該操作符還提供了兩個參數的重載方法firstordefault(t defaultvalue, func1 super t, boolean&gt; predicate)。我們可以增加一個條件。如下示例

observable.just(10,13,16).firstordefault(15, new func1&lt;integer, boolean&gt;() { 

                return integer&gt;20; 

                log.e(tag, ""+integer); 

此時資料源10,13,16都不滿足大于20,則此時将輸出預設值15,如果我們将資料源資料增加一個值22.那麼此時将不再輸出預設值,而是輸出22。

takefirst

該操作符與first操作符的差別就是如果原始observable沒有發射任何滿足條件的資料,first會抛出一個nosuchelementexception直接執行onerror(),而takefist會傳回一個空的observable(不調用onnext()但是會調用oncompleted)

如下面下面示例代碼

observable.just(10,11).filter(new func1&lt;integer, boolean&gt;() { 

        }).first().subscribe(new subscriber&lt;object&gt;() { 

            public void oncompleted() { 

                log.e(tag, "oncompleted: "); 

            public void onerror(throwable e) { 

                log.e(tag, "onerror: "+e.tostring()); 

            public void onnext(object o) { 

                log.e(tag, "onnext: "+o.tostring()); 

執行後輸出的資訊如下

onerror: java.util.nosuchelementexception: sequence contains no elements 

若此時用takefirst

observable.just(10,11).takefirst(new func1&lt;integer, boolean&gt;() { 

                log.e(tag, "call: takefirst" ); 

                return integer&gt;30; 

        }).subscribe(new subscriber&lt;object&gt;() { 

發現此時不會出現異常,而是執行了oncompleted()。

single

如果原始observable在完成之前不是正好發射一次資料,它會抛出一個nosuchelementexception,白話可以了解為發送資料是一項的話輸出此項的值,若是多個資料則抛出異常執行onerror()方法。

如下代碼

observable.just(10, 11, 12, 13).single().subscribe(new subscriber&lt;integer&gt;() { 

                 log.e(tag, "oncompleted"); 

                 log.e(tag, "onerror"+e.tostring()); 

            public void onnext(integer integer) { 

                 log.e(tag,  integer); 

輸出資訊

如果将上述代碼做下簡單更改

observable.just(10, 11, 12, 13).filter(new func1&lt;integer, boolean&gt;() { 

        }).subscribe(new subscriber&lt;integer&gt;() { 

此時會輸出資料13,因為此時通過filter後就隻有一條資料。single也有singleordefault(t)和singleordefault(t,func1)兩個變體,具體可以自己代碼測試差別。

last

該操作符與first意義相反,若我們隻對observable發射的最後一項資料,或者滿足某個條件的最後一項資料感興趣時使用該操作符。

示例代碼

observable.just(10, 11, 12, 13).last().subscribe(new action1&lt;integer&gt;() { 

               log.e(tag, "call: "+integer); 

執行後輸出13.它有一個重載方法可以指定條件,擷取滿足條件的最後一項資料的。将上面代碼修改如下

observable.just(10, 11, 12, 13).last(new func1&lt;integer, boolean&gt;() { 

                return integer &lt; 12; 

                log.e(tag, "call: "+integer); 

此時最終輸出資料就是11.該操作符和first一樣也有幾種變體,如lastordefault,takelast,具體效果可自己測試。

skip

該操作符是跳過之前的前幾項資料,然後再發射資料。

observable.range(1, 10).skip(6).subscribe(new action1&lt;integer&gt;() { 

               log.e(tag, "call: "+integer ); 

call: 8 

call: 9 

call: 10  

skip還有兩個重載方法.skip(long time, timeunit unit)預設是在computation排程器上執行,如果要有更新ui操作需要通過observeon方法指定為androidschedulers.mainthread(),當然還有一個重載方法skip(long time, timeunit unit, scheduler scheduler)可以指定排程器。注意的一點是這兩個重載方法的第一個參數不是跳過的資料數量,指的是時間。

observable.interval(500, timeunit.milliseconds) 

                .skip(2, timeunit.seconds) 

                .observeon(androidschedulers.mainthread()) 

                .subscribe(new subscriber&lt;long&gt;() { 

                    @override 

                    public void oncompleted() { 

                    } 

                    public void onerror(throwable e) { 

                    public void onnext(long along) { 

                        tv.append("\n" + along); 

                        if (along &gt; 10) { 

                            this.unsubscribe(); 

                        } 

                });  

如上代碼,通過interval每隔500毫秒産生一個資料,通過skip設定跳過時間為2秒。并且當資料大于10時解除訂閱。

skiplast

正好和skip 相反,忽略最後産生的n個資料項

observable.range(1, 10).skiplast(6).subscribe(new action1&lt;integer&gt;() { 

call: 2 

call: 4  

接下文 

本文作者:佚名

來源:51cto

繼續閱讀