contact 連接配接

concat 有道.png

concat.png
看到沒有,是一個接一個的,用圖來說就是

contact
例如,依次檢查memory、disk和network中是否存在資料,任何一步一旦發現資料後面的操作都不執行。
Observable memory = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber super String> subscriber) {
if (memoryCache != null) {
subscriber.onNext(memoryCache);
} else {
subscriber.onCompleted();
}
}
});
Observable disk = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber super String> subscriber) {
String cachePref = rxPreferences.getString("cache").get();
if (!TextUtils.isEmpty(cachePref)) {
subscriber.onNext(cachePref);
} else {
subscriber.onCompleted();
}
}
});
Observable network = Observable.just("network");
//依次檢查memory、disk、network
Observable.concat(memory, disk, network)
.first() //這裡可以做緩存過期監測
.subscribeOn(Schedulers.newThread())
.subscribe(s -> {
memoryCache = "memory";
System.out.println("--------------subscribe: " + s);
});
對于contact,有一點務必要注意,那就是上一個資料源不可用,請務必subscriber.onCompleted(); 這樣才能繼續走下一個資料源。
上面提到了緩存過期監測,那麼舉個🌰吧。
Observable source = Observable.concat(
sources.memory(),
sources.disk(),
sources.network() )
.first(data -> data != null && data.isUpToDate());//lamada表達式而已,别怕
讓我們看一下first的定義吧,當然如果first什麼參數不傳,那就直接第一個了。

first.png
merge 合并

merge
用圖來說話就是:

merge
例如一組資料來自網絡,一組資料來自檔案,需要合并兩組資料一起展示。
Observable.merge(getDataFromFile(), getDataFromNet())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber() {
@Override
public void onCompleted() {
log.d("done loading all data");
}
@Override
public void onError(Throwable e) {
log.d("error");
}
@Override
public void onNext(String data) {
log.d("all merged data will pass here one by one!")
});
compose 組合
可以将一組變換用到流上,例如抛物線寫的那個例子

compose
給 Android 開發者的 RxJava 詳解
以及這篇文章,我想也可以幫助你。
http://blog.danlew.net/2015/03/02/dont-break-the-chain/