RxJava 在 GitHub 首頁上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程式的庫)。
最簡單的使用:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("第一條");
emitter.onNext("第二條");
emitter.onComplete();
// emitter.onError(new Exception("ssfas"));
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG,"--------onSubscribe------");
}
@Override
public void onNext(String s) {
Log.i(TAG,"--------onNext------:"+s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG,"--------onError------");
}
@Override
public void onComplete() {
Log.i(TAG,"--------onComplete------");
}
});
Flowable:
/**
* BackpressureStrategy.ERROR----128
* BackpressureStrategy.BUFFE----無限制
* BackpressureStrategy.DROP-----丢棄儲存不了的--128
* BackpressureStrategy.LATEST---保留最新的-----128
* 對應的方法:
* onBackpressureBuffer()
* onBackpressureDrop()
* onBackpressureLatest()
*
*/
Flowable<String> flowable=Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
emitter.onNext("111111111111");
emitter.onNext("2222222222222");
emitter.onNext("3333333333333");
emitter.onNext("4444444444444");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR);
Subscriber<String> subscriber=new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
Log.i(TAG,"---onSubscribe---");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String s) {
Log.i(TAG,s);
}
@Override
public void onError(Throwable t) {
Log.i(TAG,"---onError---");
}
@Override
public void onComplete() {
Log.i(TAG,"---onComplete---");
}
};
flowable.subscribe(subscriber);
線程切換:
Observer<String> observer=new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG,"--------onSubscribe------:"+Thread.currentThread().getName());
compositeDisposable.add(d);
}
@Override
public void onNext(String s) {
Log.i(TAG,"--------onNext------:"+Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i(TAG,"--------onError------");
}
@Override
public void onComplete() {
Log.i(TAG,"--------onComplete------"+Thread.currentThread().getName());
}
};
Observable<String> observable=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.i(TAG,"--------Observable------"+Thread.currentThread().getName());
emitter.onNext("第一條");
emitter.onComplete();
}
});
observable.subscribeOn(Schedulers.computation())//1、Schedulers.newThread()2、Schedulers.io()3、Schedulers.computation()
.observeOn(AndroidSchedulers.mainThread())//android自帶的主線程
.subscribe(observer);
map操作符:
轉換操作符,例如本例子中,将int轉化為String列印
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "這是第:"+integer+"個指令";
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.i(TAG,s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
flatMap操作符:
這個操作符其實就是再發送一個Observable,舉個簡單的例子,比如注冊後需要登陸就可以用這個操作符
final String[][] name={{"111","1111","11111"},
{"222","2222","22222"},
{"333","3333","33333"}};
Observable.fromArray(name).flatMap(new Function<String[], ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(String[] str) throws Exception {
return Observable.fromArray(str);
};
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
Log.i(TAG,(String)o);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
filter操作符:
過濾事件,Observer中隻處理filter中傳回true的事件
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i=0;i<100;i++){
emitter.onNext(i);
}
}
}).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer%10==0;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.i(TAG,"篩選後資料為:"+integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
zip操作符:
組合兩個發送的事件
Observable<String> observable1=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("111");
emitter.onNext("222");
emitter.onNext("333");
emitter.onComplete();
}
}).subscribeOn(Schedulers.newThread());
Observable<String> observable2=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("AAA");
emitter.onNext("BBB");
emitter.onNext("CCC");
emitter.onNext("DDD");
emitter.onComplete();
}
}).subscribeOn(Schedulers.newThread());
Observable.zip(observable1, observable2, new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
return s+s2;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.i(TAG,s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.i(TAG,"-------complete---------");
}
});
其他簡單介紹:
concat:同樣是組合連個發送observable,但和zip不同,比如發送事件1,發送的是1,2,3 發送事件2,發送的是a,b,c,則zip的結果是1a,2b,3c ,而concat的結果是1,2,3,a,b,c
concatMap:和flatMap功能相同,但concatMap是有序的,而flatMap則是無序的
distinct:過濾掉相同的事件
timer:表示過多久會執行,隻執行一次
interval:定時任務,表示沒過多久就會執行一次