1 基本概念
1.1 Rx概念
一個在Java VM上使用可觀測的序列來組成異步的、基于事件的程式的庫,
其實 RxJava 的本質就是一個可以實作異步操作的庫
1.2 Rx優勢
同樣是做異步,為什麼人們用它,而不用現成的 Async / Future / XXX / ... 一個詞:簡潔! 異步操作很關鍵的一點是程式的簡潔性,因為在排程過程比較複雜的情況下,異步代碼經常會既難寫也難被讀懂。 随着程式邏輯變得越來越複雜,它依然能夠保持簡潔,對日後代碼維護省去不少力氣。k
1.3 Rx結構
響應式程式設計的主要組成部分是observable, operator和susbscriber,網上大多數文章都是介紹說有兩部分,我這裡把operator操作符也加進去了,這樣對結構的整體性會有更全面的認識)。 一般響應式程式設計的資訊流如下所示:
Observable > Operator 1 > Operator 2 > Operator 3 > Subscriber
也就是說,observable是事件的生産者,subscriber是事件最終的消費者。 因為subscriber通常在主線程中執行,是以設計上要求其代碼盡可能簡單,隻對事件進行響應,而修改事件的工作全部由operator執行。 如果我們不需要修改事件,就不需要在observable和subscriber中插入operator。這時的Rx結構如下:
Obsevable > Subscriber
這裡看起來跟設計模式中的觀察者模式很像,他們最重要的差別之一在于在沒有subscriber(觀察者模式中的observer)之前,observable(被觀察者)不會産生事件。
1.4 最簡單的模式
Observable observable = Observable.create(new Observable.OnSubscribe<Student>() {
@Override
public void call(Subscriber<? super Student> subscriber) {
for (int i = 0; i < students.size(); i++) {
subscriber.onNext(students.get(i));
}
subscriber.onCompleted();
}
});
//觀察者1
Subscriber<Student> subscriber = new Subscriber<Student>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Student student) {
System.out.println(student.getAge());
}
};
observable.subscribe(subscriber);
如果我們不關心subscriber是否結束(onComplete())或者發生錯誤(onError()),subscriber的代碼可以簡化為
Observable.just(student1,student2,student3,student4).subscribe(new Action1<Student>() {
@Override
public void call(Student student) {
log(student.getAge());
}
});
或
Observable.from(students).subscribe(new Action1<Stußßdent>() {
@Override
public void call(Student student) {
log(student.getAge());
}
});
很明顯跟subscriber比起來,action相當于隻有onNext()方法,是以,這種方式叫做不完整定義。
1.5 加入operator
1.5.1 map操作符
很多時候,我們需要針對處理過的事件做出響應,而不僅僅是Observable産生的原始事件。 意淫一下,加入我還要再輸入每個student的自我介紹怎麼辦?傳統方法for循環? 可以,但rxjava可以這麼做
Observable.from(students)
.map(new Func1<Student, Student>() {
@Override
public Student call(Student student) {
log(student.getIntroduce());
return student;
}
})
.subscribe(new Action1<Student>() {
@Override
public void call(Student student) {
log(student.getAge());
}
});
1.5.2 flatMap操作符
坑爹,繁瑣的需求又來了,每個student可以選擇不同的課程,1對多... 怎麼操作這些課程呢?傳統方式嵌套for循環? No,rxjava可以這麼做
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
log("name:" + student.getName());
return Observable.from(student.getCourses());
}
})
.map(new Func1<Course, Course>() {
@Override
public Course call(Course course) {
log("course:" + course.getName());
return course;
}
})
.subscribe(new Subscriber<Course>() {
@Override
public void onCompleted() {
log("onCompleted");
}
@Override
public void onError(Throwable throwable) {
log("onError");
}
@Override
public void onNext(Course course) {
log(" teacher:" + course.getTeacherName());
}
});
1.5.3 filter操作符
如果我想在所有的學生中找出19歲以上怎麼辦?filter!
Observable.from(studentList)
//擷取19歲以上的
.filter(new Func1<Student, Boolean>() {
@Override
public Boolean call(Student student) {
return student.getAge() >= 19;
}
})
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
System.out.println("student:" + student.getName() + " age:" + student.getAge());
return Observable.from(student.getCourses());
}
})
.map(new Func1<Course, Course>() {
@Override
public Course call(Course course) {
System.out.print(" course:" + course.getName());
return course;
}
})
.subscribe(new Subscriber<Course>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError");
}
@Override
public void onNext(Course course) {
System.out.println(" teacher:" + course.getTeacherName());
}
});
1.5.4 take操作符
如果我不想要這麼多學生我隻想要一個怎麼辦呢?take!
Observable.from(studentList)
.filter(new Func1<Student, Boolean>() {
@Override
public Boolean call(Student student) {
return student.getAge() >= 19;
}
})
.take(1)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
System.out.println("student:" + student.getName() + " age:" + student.getAge());
return Observable.from(student.getCourses());
}
})
.map(new Func1<Course, Course>() {
@Override
public Course call(Course course) {
System.out.print(" course:" + course.getName());
return course;
}
})
.subscribe(new Subscriber<Course>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError");
}
@Override
public void onNext(Course course) {
System.out.println(" teacher:" + course.getTeacherName());
}
});
rxjava提供大量的操作,這裡隻介紹幾個最基本的,有興趣的同學可以會下自行學習.
1.6 取消訂閱
實際上執行Observable.subscribe()時,它會傳回一個Subscrition,它代表了Observable和Subscriber之間的關系。你可以通過Subscrition解除Observable和Subscriber之間的訂閱關系,并立即停止執行整個訂閱鍊。
subscription.unsubscribe();
2 排程器Schedulers
2.1 基本概念
在不指定線程的情況下, RxJava 遵循的是線程不變的原則,即:在哪個線程調用 subscribe(),就在哪個線程生産事件;在哪個線程生産事件,就在哪個線程消費事件。
2.2 排程類别
Schedulers類别有很多種,下面介紹幾個常用的,Android專用的在此不做介紹,有興趣的同學請線下交流。
Schedulers.immediate(): 直接在目前線程運作,相當于不指定線程。這是預設的 Scheduler。
Schedulers.newThread(): 總是啟用新線程,并在新線程執行操作。
Schedulers.io(): 行為模式和 newThread() 差不多,差別在于 io() 的内部實作是是用一個無數量上限的線程池,可以重用空閑的線程,是以多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免建立不必要的線程。
2.3 排程器的使用
subscribeOn():指定 subscribe() 所發生的線程,或者叫做事件産生的線程
Observable observable = Observable.create(new Observable.OnSubscribe<Student>() {
@Override
public void call(Subscriber<? super Student> subscriber) {
for (int i = 0; i < students.size(); i++) {
LogUtil.log(Thread.currentThread().getName());
subscriber.onNext(students.get(i));
}
subscriber.onCompleted();
}
});
Subscriber<Student> subscriber = new Subscriber<Student>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Student student) {
LogUtil.log(Thread.currentThread().getName() + ":" + student.getAge());
}
};
LogUtil.log(Thread.currentThread().getName());
observable
.subscribeOn(Schedulers.newThread())
.subscribe(subscriber);
TimeUnit.SECONDS.sleep(1);
輸出:
main
RxNewThreadScheduler-1
RxNewThreadScheduler-1:18
RxNewThreadScheduler-1
RxNewThreadScheduler-1:19
RxNewThreadScheduler-1
RxNewThreadScheduler-1:20
從上圖可以看出,線程切換發生在 OnSubscribe 中,即在它通知上一級 OnSubscribe 時,這時事件還沒有開始發送,是以 subscribeOn() 的線程控制可以從事件發出的開端就造成影響;
observeOn():指定 Subscriber 所運作在的線程。或者叫做事件消費的線程
Observable observable = Observable.create(new Observable.OnSubscribe<Student>() {
@Override
public void call(Subscriber<? super Student> subscriber) {
for (int i = 0; i < students.size(); i++) {
LogUtil.log(Thread.currentThread().getName());
subscriber.onNext(students.get(i));
}
subscriber.onCompleted();
}
});
Subscriber<Student> subscriber = new Subscriber<Student>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Student student) {
LogUtil.log(Thread.currentThread().getName() + ":" + student.getAge());
}
};
LogUtil.log(Thread.currentThread().getName());
observable
.observeOn(Schedulers.newThread())
.subscribe(subscriber);
TimeUnit.SECONDS.sleep(1);
輸出:
main
main
main
main
RxNewThreadScheduler-1:18
RxNewThreadScheduler-1:19
RxNewThreadScheduler-1:20
從上圖可以看出,線程切換發生在它内建的 Subscriber 中,即發生在它即将給下一級 Subscriber 發送事件時,是以 observeOn() 控制的是它後面的線程 3 應用場景 3.1 緩存降級 3.2 異步接口
可以實作異步接口調用的方法有很多,比如@Async/Future/CountdownLatch...