gitHubd位址: https://github.com/ReactiveX/RxJava
https://github.com/ReactiveX/RxAndroid
引入依賴: compile 'io.reactivex:rxjava:1.0.14' compile 'io.reactivex:rxandroid:1.0.1'
1、概念 RxJava基本概念:Observable(被觀察者),Observer(觀察者),subscribe(訂閱) ,被觀察者和觀察者通過subscribe方法實作訂閱關系。RxJava的事件回調方法有OnNext(),onCompleted(),onError()。 onCompleted:事件隊列完成是執行。RxJava不僅把每個事件單獨處理,還會把他們看做一個隊列。RxJava規定,當不會有新的onNext()發出時,需要觸發onCompleted()方法作為标志。 onError:事件隊列異常時執行。在事件處理過程中出現異常時onError會被觸發,同時隊列自動終止。不允許在有事件發出。 onCompleted和onError在一個正确運作的隊列中二者是互 斥的,整個隊列的執行隻會調用其中的一個方法。
2、基本實作 (1) 建立觀察者Observer RxJava中的Observer的接口實作如下
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
};
除了以上的實作方式之外 RxJava還内置了一個實作了
Observer
的抽象類:
Subscriber
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
@Override
public void onStart() {
super.onStart();
}
};
此實作類多了一個onStart()方法 可以用于一些初始化的操作。
(2) 建立被觀察者 Observable
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("111");
subscriber.onNext("222");
subscriber.onNext("333");
subscriber.onCompleted();
}
});
Observable.create()是最基本的建立Observable方式,除此之外 還有just() from()等方式。 在傳回的observable中可以通過isUnsubscribed()來判斷是否已經訂閱 最後通過unsubscribe()來解除訂閱。
(3) 通過subscribe将二者關聯起來
observable.subscribe(observer);
最後通過subscribe将oberver和observable進行關聯。 這樣一個訂閱事件就完成了。當 Observable一被訂閱就會觸發observer的call方法。此時 将執行三次onNext()和一次onCompleted()。
3、Scheduler Scheduler是RxJava的線程排程器。主要有以下幾個:
Schedulers.immediate():直接在目前線程執行,相當于不指定線程。是預設的Scheduler。
Schedulers.newThread():總是啟用新線程,并在新線程執行操作。
Schedulers.io():I/O操作(讀寫檔案 讀寫資料庫 網絡資訊互動) 所使用的Scheduler。行為模式和newThread()差不多,差別在于IO()的内部實作是用在一個無數量上限的線程池,可以重用空閑的線程。是以,多數情況下io()比newThread()更有效率。不要把計算工作放在io()中,可以避免建立不必要的線程。 Schedulers.computation: 計算所使用的
Scheduler
。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個
Scheduler
使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在
computation()
中,否則 I/O 操作的等待時間會浪費 CPU。
除此之外,android還有一個專門的AndroidSchedulers.mainThread(),它指定的操作将在Android主線程運作。
有了切換線程的方法後專門使用呢? RxJava提高了subscribeOn() 和 observeOn() 兩個方法對線程進行控制。
subscribeOn():指定subscribe所發生的線程,即Observable.OnSubscribe被激活時所處的線程。也叫作時間産生的線程。
observeOn():指定Subscriber所運作的線程。也叫作事件消費的線程。
String[] names = {"AA", "BB", "CC", "DD", "EE", "FF"};
Observable.from(names)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("lw", "ACtion" + s);
}
});
Action1
這個方法是無參無傳回值的;由于
onCompleted()
方法也是無參無傳回值的,是以
Action1
可以被當成一個包裝對象,将
onCompleted()
的内容打包起來将自己作為一個參數傳入
subscribe()
以實作不完整定義的回調。除了Action1之外 RxJava還提供了其他的actionX系列方法(Action0...........Action9)
4、變換 (1)、map:一對一變換
Observable.just("/image/pp.png")
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
return null;
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
}
});
map将一個字元串轉換成bitMap後傳回了。 這裡出現了一個叫做
Func1
的類。它和
Action1
非常相似,也是 RxJava 的一個接口,用于包裝含有一個參數的方法。
Func1
和
Action
的差別在于,
Func1
包裝的是有傳回值的方法。另外,和
ActionX
一樣,
FuncX
也有多個,用于不同參數個數的方法。
FuncX
和
ActionX
的差別在
FuncX
包裝的是有傳回值的方法。
(2)、flatMap:一對多變換 原理: 1. 使用傳入的事件對象建立一個
Observable
對象; 2. 并不發送這個
Observable
, 而是将它激活,于是它開始發送事件; 3. 每一個建立出來的
Observable
發送的事件,都被彙入同一個
Observable
,而這個
Observable
負責将這些事件統一交給
Subscriber
的回調方法。
List<Student.Course> list = new ArrayList<>();
Student student = new Student();
Student.Course c = student.new Course("國文");
Student.Course c1 = student.new Course("數學");
Student.Course c2 = student.new Course("英語");
list.add(c);
list.add(c1);
list.add(c2);
List<Student.Course> list1 = new ArrayList<>();
Student student11 = new Student();
Student.Course c3 = student.new Course("實體");
Student.Course c4 = student.new Course("化學");
Student.Course c5 = student.new Course("生物");
list.add(c3);
list.add(c4);
list.add(c5);
Student student1 = new Student(10, "zhangsan", list);
Student student2 = new Student(10, "zhangsan", list1);
Student[] students = {student1, student2};
Subscriber<Student.Course> subscriber1 = new Subscriber<Student.Course>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Student.Course course) {
Log.e("ssssssss", course.courseName);
}
};
Observable.from(students).flatMap(new Func1<Student, Observable<Student.Course>>() {
@Override
public Observable<Student.Course> call(Student student) {
return Observable.from(student.course);
}
}).subscribe(subscriber1);
将一組學生中的課程全部列印。