天天看點

RxJava操作符系列二(下)

RxJava操作符系列二(下)

接上文

輸出日志資訊

call:2 concatmap rxnewthreadscheduler-5 

onnext: concatmap 101 concatmap 

call:2 concatmap rxnewthreadscheduler-6 

onnext: concatmap 102 concatmap 

call:2 concatmap rxnewthreadscheduler-7 

onnext: concatmap 103 concatmap 

oncompleted: concatmap  

通過該操作符和flatmap輸出的日志資訊,很容易看出flatmap并沒有保證資料源的順序性,但是concatmap操作符保證了資料源的順序性。在應用中,如果你對資料的順序性有要求的話,就需要使用concatmap。若沒有要求,二者皆可使用。

switchmap

當原始observable發射一個新的資料(observable)時,它将取消訂閱并停止監視産生執之前那個資料的observable,隻監視目前這一個.

integer[] integers = {1, 2, 3}; 

observable.from(integers).switchmap(new func1>() { 

            @override 

            public observable call(integer integer) { 

                log.e(tag, "call: switchmap" + thread.currentthread().getname()); 

                //如果不通過subscribeon(schedulers.newthread())在在子線程模拟并發操作,所有資料源依然會全部輸出,也就是并發操作此操作符才有作用 

                //若在此通過thread。sleep()設定等待時間,則輸出資訊會不一樣。相當于模拟并發程度 

                return observable.just((integer + 100) + "switchmap").subscribeon(schedulers.newthread()); 

            } 

        }).observeon(androidschedulers.mainthread()).subscribe(new subscriber() { 

            public void oncompleted() { 

                log.e(tag, "oncompleted: switchmap"); 

            public void onerror(throwable e) { 

                log.e(tag, "onerror: switchmap"); 

            public void onnext(string s) { 

                log.e(tag, "onnext: switchmap "+s); 

        });  

call: switchmapmain 

onnext: switchmap 106switchmap 

oncompleted: switchmap  

當資料源較多時,并不一定是隻輸出最後一項資料,有可能輸出幾項資料,也可能是全部。

groupby

看到這個詞你就應該想到了這個操作符的作用,就是你了解的含義,他将資料源按照你的約定進行分組。我們通過groupby實行将1到10的資料進行就劃分,代碼如下

observable.range(1, 10).groupby(new func1() { 

            public boolean call(integer integer) { 

                return integer % 2 == 0; 

        }).subscribe(new subscriber>() { 

                log.e(tag, "oncompleted:1 "); 

                log.e(tag, "onerror:1 "); 

            public void onnext(groupedobservable booleanintegergroupedobservable) { 

                booleanintegergroupedobservable.tolist().subscribe(new subscriber>() { 

                    @override 

                    public void oncompleted() { 

                        log.e(tag, "oncompleted:2 " ); 

                    } 

                    public void onerror(throwable e) { 

                        log.e(tag, "onerror:2 "); 

                    public void onnext(list integers) { 

                        log.e(tag, "onnext:2 "+integers); 

                }); 

onnext:2 [1, 3, 5, 7, 9] 

oncompleted:2 

onnext:2 [2, 4, 6, 8, 10] 

oncompleted:1  

在上面代碼中booleanintegergroupedobservable變量有一個getkey()方法,該方法傳回的是分組的key,他的值就是groupby方法call回調所用函數的值,在上面也就是integer % 2 == 0的值,及true和false。有幾個分組也是有此值決定的。

scan

操作符對原始observable發射的第一項資料應用一個函數,然後将那個函數的結果作為自己的第一項資料發射。它将函數的結果同第二項資料一起填充給這個函數來産生它自己的第二項資料。它持續進行這個過程來産生剩餘的資料序列。

例如計算1+2+3+4的和

observable.range(1,4).scan(new func2() { 

            public integer call(integer integer, integer integer2) { 

                log.e(tag, "call: integer:"+integer+"  integer2 "+integer2); 

                return integer+integer2; 

        }).subscribe(new subscriber() { 

                log.e(tag, "oncompleted: "); 

                log.e(tag, "onerror: " ); 

            public void onnext(integer integer) { 

                log.e(tag, "onnext: "+integer ); 

onnext: 1 

call: integer:1  integer2 2 

onnext: 3 

call: integer:3  integer2 3 

onnext: 6 

call: integer:6  integer2 4 

onnext: 10 

oncompleted:  

對于scan有一個重載方法,可以設定一個初始值,如上面代碼,初始值設定為10,隻需将scan加個參數scan(10,new func2)。

buffer

操作符将一個observable變換為另一個,原來的observable正常發射資料,變換産生的observable發射這些資料的緩存集合,如果原來的observable發射了一個onerror通知,buffer會立即傳遞這個通知,而不是首先發射緩存的資料,即使在這之前緩存中包含了原始observable發射的資料。

示例代碼

observable.range(10, 6).buffer(2).subscribe(new subscriber>() { 

                log.e(tag, "onerror: "); 

            public void onnext(list integers) { 

                log.e(tag, "onnext: " + integers); 

onnext: [10, 11] 

onnext: [12, 13] 

onnext: [14, 15] 

上面一次性訂閱兩個資料,如果設定參數為6,就一次性訂閱。buffer的另一重載方法buffer(count, skip)從原始observable的第一項資料開始建立新的緩存(長度count),此後每當收到skip項資料,用count項資料填充緩存:開頭的一項和後續的count-1項,它以清單(list)的形式發射緩存,取決于count和skip的值,這些緩存可能會有重疊部分(比如skip count時)。具體執行結果,你可以設定不同的skip和count觀察輸出日志,檢視執行結果及流程。

window

window和buffer類似,但不是發射來自原始observable的資料包,它發射的是observables,這些observables中的每一個都發射原始observable資料的一個子集,最後發射一個oncompleted通知。

observable.range(10, 6).window(2).subscribe(new subscriber>() { 

                log.e(tag, "oncompleted1: "); 

                log.e(tag, "onerror1: "); 

            public void onnext(observable integerobservable) { 

                log.e(tag, "onnext1: "); 

                tv1.append("\n"); 

                integerobservable.subscribe(new subscriber() { 

                        log.e(tag, "oncompleted2: "); 

                        log.e(tag, "onerror2: "); 

                    public void onnext(integer integer) { 

                        log.e(tag, "onnext2: "+integer); 

onnext2: 10 

onnext2: 11 

oncompleted2: 

onnext2: 12 

onnext2: 13 

onnext2: 14 

onnext2: 15 

oncompleted1:  

window和buffer一樣也有不同的重載方法。這兩個操作符相對其他操作符不太容易了解,可以去rxjava github了解,裡面有圖示解析。當然最好的了解方式就是通過更改變量的值,去觀察輸出的日志資訊。

好了,這篇文章就介紹到這裡。若文中有錯誤的地方,歡迎指正。謝謝。

本文作者:佚名

來源:51cto

繼續閱讀