/**
* @author :houde
* 時間:2018/1/23
* Des:RxJava 變換操作符
*/
public class RxOperateActivity extends AppCompatActivity {
private final String TAG = "RxOperateActivity";
Observable<Integer> observable1 = Observable.just(1,2,3,4);
Observable<String> observable2 = Observable.just("A","B","C");
private Observer<String> stringObserver = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG,"開始采用subscribe連接配接");
}
@Override
public void onNext(String s) {
Log.e(TAG,s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG,e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG,"對Complete事件作出響應");
}
};
private Observer<Integer> intObserver = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG,"開始采用subscribe連接配接");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG,"事件 = " + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG,e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG,"對Complete事件作出響應");
}
} ;
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_image);
//轉換操作符
/**
* 作用
* 對被觀察者發送的每1個事件都通過指定的函數處理,進而變換成另外一種事件
* 即:将被觀察者發送的事件轉換為任意的類型事件。
* 應用場景
* 資料類型轉換
* 具體使用
* 下面以将 使用Map()将事件的參數從整型變換成字元串類型為例子說明
*/
map();
/**
* 作用:
* 将被觀察者發送的事件序列進行拆分&單獨轉換,再合并成一個新的事件序列,最後再進行發送
*
* 原理
* 1.為事件序列中每個事件都建立一個 Observable 對象;
* 2.将對每個 原始事件 轉換後的 新事件 都放入到對應 Observable對象;
* 3.将建立的每個Observable 都合并到一個 建立的、總的Observable 對象;
* 4.建立的、總的Observable 對象 将 新合并的事件序列 發送給觀察者(Observer)
* 應用場景
* 無序的将被觀察者發送的整個事件序列進行變換
*/
flatMap();
/**
* 作用:類似FlatMap()操作符
* 與FlatMap()的 差別在于:拆分 & 重新合并生成的事件序列 的順序 = 被觀察者舊序列生産的順序
* 應用場景
* 有序的将被觀察者發送的整個事件序列進行變換
*/
concatMap();
/**
* 作用
* 定期從 被觀察者(Observable)需要發送的事件中擷取一定數量的事件&放到緩存區中,
* 最終發送
*
* 應用場景
* 緩存被觀察者發送的事件
*/
buffer();
//組合和并操作符
/**
* 作用
* 組合多個被觀察者一起發送資料,合并後 按發送順序串行執行
* 二者差別:
* 組合被觀察者的數量,即concat()組合被觀察者數量≤4個,
* 而concatArray()則可>4個
*/
concat();
concatArray();
/**
* 作用
* 組合多個被觀察者一起發送資料,合并後 按時間線并行執行
*
* 二者差別:
* 組合被觀察者的數量,即merge()組合被觀察者數量≤4個,
* 而mergeArray()則可>4個
*
* 差別上述concat()操作符:
* 同樣是組合多個被觀察者一起發送資料,但concat()操作符合并後是按發送順序串行執行
*/
merge();
mergeArray();
/**
* 背景:
* 使用merge和concat操作符時,
* 沖突:
* 若其中一個被觀察者發出onError事件,則會終止其他被觀察者繼續發送事件
* 解決方案:
* 若希望onError事件推遲到其他被觀察者發送完事件之後再觸發
* 即需要使用對應的mergeDelayError()或concatDelayError操作符
*
*/
concatDelayError();
mergeDelayError();
//事件的合并
/**
* 作用
* 合并多個被觀察者(Observable)發送的事件,
* 生成一個新的事件序列(即組合過後的事件序列),并最終發送
* 特别注意:
* 事件組合方式 = 嚴格按照原先事件序列 進行對位合并
* 最終合并的事件數量 = 多個被觀察者(Observable)中數量最少的數量
* 特别注意:
* 盡管被觀察者2的事件D沒有事件與其合并,但還是會繼續發送
* 若在被觀察者1 & 被觀察者2的事件序列最後發送onComplete()事件,
* 則被觀察者2的事件D也不會發送,測試結果如下
* 定義:
* 屬于Rxjava中的組合
* 作用:
* 1.合并多個被觀察者(Observable)發送的事件
* 2.生成一個新的事件序列(即合并之後的序列),并最終發送
* 原理:
* 1.事件組合方式 = 嚴格按照原先事件序列進行對位合并
* 2.最終合并的事件數量 = 多個被觀察者(Observable)中數量最少的數量
*
* 應用場景:
* 1.當展示的資訊需要從多個地方擷取(即 資訊 = 資訊1 + 資訊2)& 統一結合後再展示
* 2.如:合并網絡請求的發送 & 統一展示結果
*/
zip();
/**
* 作用
* 當兩個Observables中的任何一個發送了資料後,将先發送了資料的Observables
* 的最新(最後)一個資料與另外一個Observable發送的每個資料結合,最終基于該
* 函數的結果發送資料
* 與Zip()的差別:
* Zip() = 按個數合并,
* 即1對1合并;CombineLatest() = 按時間合并,即在同一個時間點上合并
*
* combineLatestDelayError()
* 作用類似于concatDelayError() / mergeDelayError() ,即錯誤處理,此處不作過多描述
*/
combineLatest();
/**
* 作用
* 把被觀察者需要發送的事件聚合成1個事件 & 發送
*
* 聚合的邏輯根據需求撰寫,但本質都是前2個資料聚合,然後與後1個資料繼續進行聚合,依次類推
*/
reduce();
/**
*作用
* 将被觀察者Observable發送的資料事件收集到一個資料結構裡
*/
collect();
//發送事件前追加發送事件
/**
* 作用
* 在一個被觀察者發送事件前,追加發送一些資料/一個新的被觀察者
*/
startWith();
startWithArray();
//統計發送事件數量
/**
* 作用
* 統計被觀察者發送事件的數量
*/
count();
}
private void count() {
observable1.count().subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e(TAG,"發送事件的次數 = " + aLong);
}
});
}
private void startWithArray() {
Observable.just(4,5,6,7)
.startWith(0)
.startWithArray(1,2,3)
.subscribe(intObserver);
}
private void startWith() {
Observable.just(1,2,3,4)
.startWith(0)
.subscribe(intObserver);
}
private void collect() {
observable1.collect(
// 1. 建立資料結構(容器),用于收集被觀察者發送的資料
new Callable<ArrayList<Integer>>() {
@Override
public ArrayList<Integer> call() throws Exception {
return new ArrayList<>();
}
// 2. 對發送的資料進行收集
}, new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> list, Integer integer) throws Exception {
// 參數說明:list = 容器,integer = 後者資料
list.add(integer);
// 對發送的資料進行收集
}
}).subscribe(new Consumer<ArrayList<Integer>>() {
@Override
public void accept(ArrayList<Integer> list) throws Exception {
Log.e(TAG, "本次發送的資料是: " + list);
}
});
}
private void reduce() {
observable1.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer s1, Integer s2) throws Exception {
Log.e(TAG, "本次計算的資料是: "+s1 +" 乘 "+ s2);
return s1 * s2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "最終計算的結果是: " + integer);
}
});
}
private void combineLatest() {
Log.e(TAG,"-------------------combineLatest-------------------");
Observable.combineLatest(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return s + integer;
}
}).subscribe(stringObserver);
}
private void zip() {
Log.e(TAG,"-------------------zip-------------------");
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(stringObserver);
}
private void mergeDelayError() {
Log.e(TAG,"-------------------mergeDelayError-------------------");
Observable.mergeArrayDelayError(
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
// 發送Error事件,因為使用了concatDelayError,是以第2個Observable将會發送事件,等發送完畢後,再發送錯誤事件
emitter.onError(new NullPointerException("這裡發送了一個onError()"));
emitter.onComplete();
}
}),
Observable.just(4, 5, 6))
.subscribe(intObserver);
}
private void concatDelayError() {
Log.e(TAG,"-------------------concatDelayError-------------------");
Observable.concat(
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
// 發送Error事件,因為無使用concatDelayError,是以第2個Observable将不會發送事件
emitter.onError(new NullPointerException("這裡發送了一個onError()"));
emitter.onComplete();
}
}),
Observable.just(4, 5, 6))
.subscribe(intObserver);
}
private void mergeArray(){
Log.e(TAG,"-------------------mergeArray-------------------");
Observable.mergeArray(Observable.just(1,2,3),
Observable.just(4,5,6),
Observable.just(7,8,9),
Observable.just(10,11,12),
Observable.just(13,14,15),
Observable.just(16,17,18)
).subscribe(intObserver);
}
private void merge(){
Log.e(TAG,"-------------------merge-------------------");
Observable.merge(Observable.just(1,2,3,4),
Observable.just(5,6),
Observable.just(7,8,9),
Observable.just(10,11,12,13)
)
.subscribe(intObserver);
}
private void concatArray(){
Log.e(TAG,"-------------------concatArray-------------------");
Observable.concatArray(Observable.just(1,2,3),
Observable.just(4,5,6),
Observable.just(9,10),
Observable.just(11,12,13),
Observable.just(14,15,16)
).subscribe(intObserver);
}
private void concat(){
// concat():組合多個被觀察者(≤4個)一起發送資料
// 注:串行執行
Log.e(TAG,"-------------------concat-------------------");
Observable.concat(Observable.just(1,2,3),
Observable.just(4,5,6),
Observable.just(9,10)
).subscribe(intObserver);
}
private void buffer() {
Log.e(TAG,"-------------------buffer-------------------");
Observable.just(1,2,3,4,5,6,7,8)
// 設定緩存區大小 & 步長
// 緩存區大小 = 每次從被觀察者中擷取的事件數量
// 步長 = 每次擷取新事件的數量
.buffer(3,1)
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG,"開始采用subscribe連接配接");
}
@Override
public void onNext(List<Integer> ints) {
Log.e(TAG,"緩存區裡的事件個數" + ints.size());
for(int i = 0 ,size = ints.size(); i < size;i++){
Log.e(TAG,"事件 = " + i);
}
}
@Override
public void onError(Throwable e) {
Log.e(TAG,e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG,"對Complete事件作出響應");
}
});
}
private void concatMap() {
Log.e(TAG,"-------------------concatMap-------------------");
Observable.just(4,3,2,1)
.concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.fromIterable(getEvents(integer));
}
})
.subscribe(stringObserver);
}
private void flatMap() {
Log.e(TAG,"-------------------flatMap-------------------");
Observable.just(1,2,3,4)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.fromIterable(getEvents(integer));
}
}).subscribe(stringObserver);
}
@NonNull
private List<String> getEvents(Integer integer) {
List<String> event = new ArrayList<>(3);
for(int i = 0 ; i < 3 ; i++){
event.add("我是事件 " + integer + "拆分後的子事件" + i);
}
return event;
}
private void map() {
Log.e(TAG,"-------------------map-------------------");
Observable.just(1,2,3,4).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "使用 Map變換操作符 将事件" + integer +"的參數從 整型"+integer + " 變換成 字元串類型" + integer;
}
}).subscribe(stringObserver);
}
}