RxJava的操作符
建立和訂閱一個 Observable 是足夠簡單的,可能這并不是非常有用的,但這隻是用 RxJava 的一個開始。通過調用操作符,任何的 Observable 都能進行輸出轉變,多個Operators 能連結到 Observable上。RxJava 提供了對事件序列進行變換的支援,這是它的核心功能之一,所謂變換,就是将事件序列中的對象或整個序列進行加工處理,轉換成不同的事件或事件序列。
map()
map()是RXJava操作符中最簡單的一個,它所實作的功能為對事件對象的直接轉換,并且是一對一的轉換。如下面所示的例子,我通過map()操作符進行了如下操作:通過new Date()方法建立一個目前時間的對象,通過map()操作符對這個對象進行格式的轉換後輸出為字元串,并在字元串前面加上字首“Current time: ”,最終将字元串顯示到TextView控件當中。
Observable.just(new Date()).map(new Func1<Date, String>() {
@Override
public String call(Date date) {
String s = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date);
return "Current time: " + s;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
sb.append(s).append("\n");
tv_rx_text.setText(sb.toString());
}
});
flatMap()
有了map()操作符之後,我們已經能夠比較容易的對事件對象進行所需的轉換操作,但是您也能發現在上一部分我強調了一下map()操作符所進行的轉換是一對一的,這就是這個操作符的缺陷。如果我們需要對一個事件中的對象進行轉換,而這個事件包含N個事件對象,那麼我們相對于要進行N次的map()操作,這無疑是很浪費的。有人說了我們可以将事件中的N個對象封轉到一個List裡面,我們在轉換的時候對List進行周遊,分别對每個事件對象進行轉換。這能實作現在的需求嗎?答案是肯定的,但是如果我們不想去封轉一個List呢,這個時候一個新的操作符flatMap()就出現了,這個操作符實作的功能也是對事件對象的轉換,并且它是支援一對多的轉換。
Subscriber<Student.Course> subscriber = new Subscriber<Student.Course>() {
@Override
public void onCompleted() {
sb.append("onCompleted\n");
tv_rx_text.setText(sb.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Student.Course course) {
sb.append(course.getCourse() + "\n");
tv_rx_text.setText(sb.toString());
}
};
Observable.just(student[i]).flatMap(new Func1<Student, Observable<Student.Course>>() {
@Override
public Observable<Student.Course> call(Student student) {
return Observable.from(student.getCourseList());
}
}).subscribe(subscriber);
如上述代碼所示,這個示例中我們做了這麼一件事情:我們傳入了一個Student對象(裡面包含一個内部類Course用來儲存student的課程資訊),我們通過flatMap()對傳入的Student對象進行轉換,将Student對象轉換成了一個Course對象,并用這個Course對象構造一個Observable對象傳回進行處理,最後将Course對象中包含的課程資訊顯示在TextView中。
從上面的代碼中可以看出,map()和flatMap()有一個共同點:都是把傳入的參數轉換之後傳回另一個對象。但是和map()方法不同的是,flatMap()中傳回的是一個Observable對象,并且這個Observable不是直接被發送到了Subscriber的回調方法中。flatMap()的原理是這樣的:
- 使用傳入的事件對象建立一個 Observable 對象;
- 并不發送這個 Observable, 而是将它激活,于是它開始發送事件;
- 每一個建立出來的 Observable 發送的事件,都被彙入同一個 Observable ,而這個 Observable 負責将這些事件統一交給 Subscriber 的回調方法。
這三個步驟,把事件拆成了兩級,通過一組新建立的 Observable 将初始的對象『鋪平』之後通過統一路徑分發了下去。而這個『鋪平』就是 flatMap() 所謂的 flat。
filter()
filter()操作符是可以對Observable流程的資料進行一層過濾處理,filter() 傳回為 false 的值将不會發出到 Subscriber。
int num = (int)(Math.random()*100);
Observable.just(num).filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
boolean isEvenNumber = integer % 2 == 0;
return isEvenNumber;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
sb.append("Current number: " + integer + "\n");
tv_rx_text.setText(sb.toString());
}
});
上述示例為随機生成一個0~100之間的數字,當數字為偶數時,對數字按“Current number: ”的格式顯示在TextView中。
interval()
對于輪詢器大家一定不陌生,開發中無論是Java的Timer+TimeTask , 還是Android的Hanlder都可實作。當然在RxJava中也有這樣的實作方式,那就是使用interval()操作符。我們使用interval()實作一個10s的計時器,每間隔一面在TextView中更新一下時間,直到10s。代碼如下:
Observable.interval(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
sb.append("Complete ! \n");
tvRxText.setText(sb.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
if(aLong < 10){
sb.append(aLong + 1 + " s" + "\n");
tvRxText.setText(sb.toString());
}else {
onCompleted();
}
}
});
interval()方法的第一個參數為每次更新的時間間隔,第二個參數為該時間間隔的機關。通過運作此代碼,我們發現确實能實作10s計時器的功能,但是到了10s以後,計時器仍未停止,它會一直下去(TextView中的“Complete !”依然每隔一秒列印一次)。是以這其實也是一種浪費,網上有說可以在onNext()裡計算時間,達到要求時進行解綁(目前我還沒找到解綁interval的方法,如果您知道,請賜教)。在這種情況下,take()操作符應運而生,它和interval()能完美結合實作計時器的功能,接下來我們來看一下take()操作符的使用。
take()
take從字面意思上可以了解就是“拿,取”的意思。是以take()所起的作用也就是取的作用,根據傳入參數的數值N來擷取前N個onNext()的結果,達到指定數值之後,調用onCompleted()完成此次計時操作。
Observable.interval(1, TimeUnit.SECONDS)
.take(10)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
sb.append("Complete ! \n");
tvRxText.setText(sb.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
sb.append(aLong + 1 + " s" + "\n");
tvRxText.setText(sb.toString());
}
});
take()操作符的使用方法如上所示,雖然比較簡單,但是卻很好的解決了interval計時不能停的問題。
總的來說,RxJava中的操作符可以說是RxJava中比較核心的部分,合理的運用這些操作符會讓我們的工作事半功倍。
參考:
https://github.com/ReactiveX/RxJava
http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2015/1012/3572.html#toc_1
http://blog.csdn.net/lzyzsd/article/details/44094895
http://www.ithao123.cn/content-9344110.html
http://blog.csdn.net/tangxl2008008/article/details/51334295