對rxJava不了解的同學可以先看
RxJava 和 RxAndroid 一 (基礎)
RxJava 和 RxAndroid 二(操作符的使用)
RxJava 和 RxAndroid 三(生命周期控制和記憶體優化)
RxJava 和 RxAndroid 四(RxBinding的使用)
本文将有幾個例子說明,rxjava線程排程的正确使用姿勢。
例1
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Logger.v( "rx_call" , Thread.currentThread().getName() );
subscriber.onNext( "dd");
subscriber.onCompleted();
}
})
.map(new Func1<String, String >() {
@Override
public String call(String s) {
Logger.v( "rx_map" , Thread.currentThread().getName() );
return s + "88";
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Logger.v( "rx_subscribe" , Thread.currentThread().getName() );
}
}) ;
結果
/rx_call: main -- 主線程
/rx_map: main -- 主線程
/rx_subscribe: main -- 主線程
例2
new Thread(new Runnable() {
@Override
public void run() {
Logger.v( "rx_newThread" , Thread.currentThread().getName() );
rx();
}
}).start();
void rx(){
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Logger.v( "rx_call" , Thread.currentThread().getName() );
subscriber.onNext( "dd");
subscriber.onCompleted();
}
})
.map(new Func1<String, String >() {
@Override
public String call(String s) {
Logger.v( "rx_map" , Thread.currentThread().getName() );
return s + "88";
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Logger.v( "rx_subscribe" , Thread.currentThread().getName() );
}
}) ;
}
結果
/rx_newThread: Thread-564 -- 子線程
/rx_call: Thread-564 -- 子線程
/rx_map: Thread-564 -- 子線程
/rx_subscribe: Thread-564 -- 子線程
- 通過例1和例2,說明,Rxjava預設運作在目前線程中。如果目前線程是子線程,則rxjava運作在子線程;同樣,目前線程是主線程,則rxjava運作在主線程
例3
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Logger.v( "rx_call" , Thread.currentThread().getName() );
subscriber.onNext( "dd");
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(new Func1<String, String >() {
@Override
public String call(String s) {
Logger.v( "rx_map" , Thread.currentThread().getName() );
return s + "88";
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Logger.v( "rx_subscribe" , Thread.currentThread().getName() );
}
}) ;
/rx_call: RxCachedThreadScheduler-1 --io線程
/rx_map: main --主線程
/rx_subscribe: main --主線程
例4
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Logger.v( "rx_call" , Thread.currentThread().getName() );
subscriber.onNext( "dd");
subscriber.onCompleted();
}
})
.map(new Func1<String, String >() {
@Override
public String call(String s) {
Logger.v( "rx_map" , Thread.currentThread().getName() );
return s + "88";
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Logger.v( "rx_subscribe" , Thread.currentThread().getName() );
}
}) ;
/rx_call: RxCachedThreadScheduler-1 --io線程
/rx_map: RxCachedThreadScheduler-1 --io線程
- 通過例3、例4 可以看出 .subscribeOn(Schedulers.io()) 和 .observeOn(AndroidSchedulers.mainThread()) 寫的位置不一樣,造成的結果也不一樣。從例4中可以看出 map() 操作符預設運作在事件産生的線程之中。事件消費隻是在 subscribe() 裡面。
- 對于 create() , just() , from() 等 --- 事件産生
map() , flapMap() , scan() , filter() 等 -- 事件加工
subscribe() -- 事件消費
- 事件産生:預設運作在目前線程,可以由 subscribeOn() 自定義線程
事件加工:預設跟事件産生的線程保持一緻, 可以由 observeOn() 自定義線程
事件消費:預設運作在目前線程,可以有observeOn() 自定義
例5 多次切換線程
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Logger.v( "rx_call" , Thread.currentThread().getName() );
subscriber.onNext( "dd");
subscriber.onCompleted();
}
})
.observeOn( Schedulers.newThread() ) //新線程
.map(new Func1<String, String >() {
@Override
public String call(String s) {
Logger.v( "rx_map" , Thread.currentThread().getName() );
return s + "88";
}
})
.observeOn( Schedulers.io() ) //io線程
.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
Logger.v( "rx_filter" , Thread.currentThread().getName() );
return s != null ;
}
})
.subscribeOn(Schedulers.io()) //定義事件産生線程:io線程
.observeOn(AndroidSchedulers.mainThread()) //事件消費線程:主線程
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Logger.v( "rx_subscribe" , Thread.currentThread().getName() );
}
}) ;
/rx_call: RxCachedThreadScheduler-1 -- io 線程
/rx_map: RxNewThreadScheduler-1 -- new出來的線程
/rx_filter: RxCachedThreadScheduler-2 -- io線程
/rx_subscribe: main -- 主線程
例6:隻規定了事件産生的線程
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.v( "rx--create " , Thread.currentThread().getName() ) ;
subscriber.onNext( "dd" ) ;
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.v( "rx--subscribe " , Thread.currentThread().getName() ) ;
}
}) ;
/rx--create: RxCachedThreadScheduler-4 // io 線程
/rx--subscribe: RxCachedThreadScheduler-4 // io 線程
例:7:隻規定事件消費線程
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.v( "rx--create " , Thread.currentThread().getName() ) ;
subscriber.onNext( "dd" ) ;
}
})
.observeOn( Schedulers.newThread() )
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.v( "rx--subscribe " , Thread.currentThread().getName() ) ;
}
}) ;
/rx--create: main -- 主線程
/rx--subscribe: RxNewThreadScheduler-1 -- new 出來的子線程
從例6可以看出,如果隻規定了事件産生的線程,那麼事件消費線程将跟随事件産生線程。
從例7可以看出,如果隻規定了事件消費的線程,那麼事件産生的線程和 目前線程保持一緻。
例8:線程排程封裝
在Android 常常有這樣的場景,背景處理處理資料,前台展示資料。
一般的用法:
Observable
.just( "123" )
.subscribeOn( Schedulers.io())
.observeOn( AndroidSchedulers.mainThread() )
.subscribe(new Action1() {
@Override
public void call(Object o) {
}
}) ;
但是項目中這種場景有很多,是以我們就想能不能把這種場景的排程方式封裝起來,友善調用。
簡單的封裝
public Observable apply( Observable observable ){
return observable.subscribeOn( Schedulers.io() )
.observeOn( AndroidSchedulers.mainThread() ) ;
}
使用
apply( Observable.just( "123" ) )
.subscribe(new Action1() {
@Override
public void call(Object o) {
}
}) ;
弊端:雖然上面的這種封裝可以做到線程排程的目的,但是它破壞了鍊式程式設計的結構,是程式設計風格變得不優雅。
改進:Transformers 的使用(就是轉化器的意思,把一種類型的Observable轉換成另一種類型的Observable )
改進後的封裝
Observable.Transformer schedulersTransformer = new Observable.Transformer() {
@Override public Object call(Object observable) {
return ((Observable) observable).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
}
};
使用
Observable
.just( "123" )
.compose( schedulersTransformer )
.subscribe(new Action1() {
@Override
public void call(Object o) {
}
}) ;
弊端:雖然保持了鍊式程式設計結構的完整,但是每次調用 .compose( schedulersTransformer ) 都是 new 了一個對象的。是以我們需要再次封裝,盡量保證單例的模式。
package lib.app.com.myapplication;
import rx.Observable;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;
/**
* Created by ${zyj} on 2016/7/1.
*/
public class RxUtil {
private final static Observable.Transformer schedulersTransformer = new Observable.Transformer() {
@Override public Object call(Object observable) {
return ((Observable) observable).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
}
};
public static <T> Observable.Transformer<T, T> applySchedulers() {
return (Observable.Transformer<T, T>) schedulersTransformer;
}
}
Observable
.just( "123" )
.compose( RxUtil.<String>applySchedulers() )
.subscribe(new Action1() {
@Override
public void call(Object o) {
}
}) ;