天天看點

Rxjava學習1 基本概念2 排程器Schedulers

1 基本概念

1.1 Rx概念

一個在Java VM上使用可觀測的序列來組成異步的、基于事件的程式的庫,

其實 RxJava 的本質就是一個可以實作異步操作的庫

1.2 Rx優勢

同樣是做異步,為什麼人們用它,而不用現成的 Async / Future / XXX / ... 一個詞:簡潔! 異步操作很關鍵的一點是程式的簡潔性,因為在排程過程比較複雜的情況下,異步代碼經常會既難寫也難被讀懂。 随着程式邏輯變得越來越複雜,它依然能夠保持簡潔,對日後代碼維護省去不少力氣。k

1.3 Rx結構

響應式程式設計的主要組成部分是observable, operator和susbscriber,網上大多數文章都是介紹說有兩部分,我這裡把operator操作符也加進去了,這樣對結構的整體性會有更全面的認識)。 一般響應式程式設計的資訊流如下所示:

Observable > Operator 1 > Operator 2 > Operator 3 > Subscriber

也就是說,observable是事件的生産者,subscriber是事件最終的消費者。 因為subscriber通常在主線程中執行,是以設計上要求其代碼盡可能簡單,隻對事件進行響應,而修改事件的工作全部由operator執行。 如果我們不需要修改事件,就不需要在observable和subscriber中插入operator。這時的Rx結構如下:

Obsevable > Subscriber

這裡看起來跟設計模式中的觀察者模式很像,他們最重要的差別之一在于在沒有subscriber(觀察者模式中的observer)之前,observable(被觀察者)不會産生事件。

1.4 最簡單的模式

Observable observable = Observable.create(new Observable.OnSubscribe<Student>() {
            @Override
            public void call(Subscriber<? super Student> subscriber) {
                for (int i = 0; i < students.size(); i++) {
                    subscriber.onNext(students.get(i));
                }
                subscriber.onCompleted();
            }
        });
        //觀察者1
        Subscriber<Student> subscriber = new Subscriber<Student>() {
            @Override
            public void onCompleted() {
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(Student student) {
                System.out.println(student.getAge());
            }
        };
        observable.subscribe(subscriber);
           

如果我們不關心subscriber是否結束(onComplete())或者發生錯誤(onError()),subscriber的代碼可以簡化為

Observable.just(student1,student2,student3,student4).subscribe(new Action1<Student>() {
    @Override
    public void call(Student student) {
        log(student.getAge());
    }
});
           

Observable.from(students).subscribe(new Action1<Stußßdent>() {
    @Override
    public void call(Student student) {
        log(student.getAge());
    }
});
           

很明顯跟subscriber比起來,action相當于隻有onNext()方法,是以,這種方式叫做不完整定義。

1.5 加入operator

1.5.1 map操作符

很多時候,我們需要針對處理過的事件做出響應,而不僅僅是Observable産生的原始事件。 意淫一下,加入我還要再輸入每個student的自我介紹怎麼辦?傳統方法for循環? 可以,但rxjava可以這麼做

Observable.from(students)
                .map(new Func1<Student, Student>() {
                    @Override
                    public Student call(Student student) {
                        log(student.getIntroduce());
                        return student;
                    }
                })
                .subscribe(new Action1<Student>() {
                    @Override
                    public void call(Student student) {
                        log(student.getAge());
                    }
                });
           

1.5.2 flatMap操作符

坑爹,繁瑣的需求又來了,每個student可以選擇不同的課程,1對多... 怎麼操作這些課程呢?傳統方式嵌套for循環? No,rxjava可以這麼做

Observable.from(students)
            .flatMap(new Func1<Student, Observable<Course>>() {
                @Override
                public Observable<Course> call(Student student) {
                    log("name:" + student.getName());
                    return Observable.from(student.getCourses());
                }
            })
            .map(new Func1<Course, Course>() {

                @Override
                public Course call(Course course) {
                    log("course:" + course.getName());
                    return course;
                }
            })
            .subscribe(new Subscriber<Course>() {
                @Override
                public void onCompleted() {
                    log("onCompleted");
                }

                @Override
                public void onError(Throwable throwable) {
                    log("onError");
                }

                @Override
                public void onNext(Course course) {
                    log(" teacher:" + course.getTeacherName());
                }
            });
           

1.5.3 filter操作符

如果我想在所有的學生中找出19歲以上怎麼辦?filter!

Observable.from(studentList)
            //擷取19歲以上的
            .filter(new Func1<Student, Boolean>() {
                @Override
                public Boolean call(Student student) {
                    return student.getAge() >= 19;
                }
            })
            .flatMap(new Func1<Student, Observable<Course>>() {
                @Override
                public Observable<Course> call(Student student) {
                    System.out.println("student:" + student.getName() + " age:" + student.getAge());
                    return Observable.from(student.getCourses());
                }
            })
            .map(new Func1<Course, Course>() {

                @Override
                public Course call(Course course) {
                    System.out.print(" course:" + course.getName());
                    return course;
                }
            })
            .subscribe(new Subscriber<Course>() {
                @Override
                public void onCompleted() {
                    System.out.println("onCompleted");
                }

                @Override
                public void onError(Throwable throwable) {
                    System.out.println("onError");
                }

                @Override
                public void onNext(Course course) {
                    System.out.println(" teacher:" + course.getTeacherName());
                }
            });
           

1.5.4 take操作符

如果我不想要這麼多學生我隻想要一個怎麼辦呢?take!

Observable.from(studentList)
            .filter(new Func1<Student, Boolean>() {
                @Override
                public Boolean call(Student student) {
                    return student.getAge() >= 19;
                }
            })
            .take(1)
            .flatMap(new Func1<Student, Observable<Course>>() {
                @Override
                public Observable<Course> call(Student student) {
                    System.out.println("student:" + student.getName() + " age:" + student.getAge());
                    return Observable.from(student.getCourses());
                }
            })
            .map(new Func1<Course, Course>() {

                @Override
                public Course call(Course course) {
                    System.out.print(" course:" + course.getName());
                    return course;
                }
            })
            .subscribe(new Subscriber<Course>() {
                @Override
                public void onCompleted() {
                    System.out.println("onCompleted");
                }

                @Override
                public void onError(Throwable throwable) {
                    System.out.println("onError");
                }

                @Override
                public void onNext(Course course) {
                    System.out.println(" teacher:" + course.getTeacherName());
                }
            });
           

rxjava提供大量的操作,這裡隻介紹幾個最基本的,有興趣的同學可以會下自行學習.

1.6 取消訂閱

實際上執行Observable.subscribe()時,它會傳回一個Subscrition,它代表了Observable和Subscriber之間的關系。你可以通過Subscrition解除Observable和Subscriber之間的訂閱關系,并立即停止執行整個訂閱鍊。

subscription.unsubscribe();

2 排程器Schedulers

2.1 基本概念

在不指定線程的情況下, RxJava 遵循的是線程不變的原則,即:在哪個線程調用 subscribe(),就在哪個線程生産事件;在哪個線程生産事件,就在哪個線程消費事件。

2.2 排程類别

Schedulers類别有很多種,下面介紹幾個常用的,Android專用的在此不做介紹,有興趣的同學請線下交流。

Schedulers.immediate(): 直接在目前線程運作,相當于不指定線程。這是預設的 Scheduler。

Schedulers.newThread(): 總是啟用新線程,并在新線程執行操作。

Schedulers.io(): 行為模式和 newThread() 差不多,差別在于 io() 的内部實作是是用一個無數量上限的線程池,可以重用空閑的線程,是以多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免建立不必要的線程。

2.3 排程器的使用

subscribeOn():指定 subscribe() 所發生的線程,或者叫做事件産生的線程

Observable observable = Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        for (int i = 0; i < students.size(); i++) {
            LogUtil.log(Thread.currentThread().getName());
            subscriber.onNext(students.get(i));
        }
        subscriber.onCompleted();
    }
});
Subscriber<Student> subscriber = new Subscriber<Student>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(Student student) {
        LogUtil.log(Thread.currentThread().getName() + ":" + student.getAge());
    }
};
LogUtil.log(Thread.currentThread().getName());
observable
        .subscribeOn(Schedulers.newThread())
        .subscribe(subscriber);
TimeUnit.SECONDS.sleep(1);
           

輸出:

main

RxNewThreadScheduler-1

RxNewThreadScheduler-1:18

RxNewThreadScheduler-1

RxNewThreadScheduler-1:19

RxNewThreadScheduler-1

RxNewThreadScheduler-1:20

Rxjava學習1 基本概念2 排程器Schedulers

 從上圖可以看出,線程切換發生在 OnSubscribe 中,即在它通知上一級 OnSubscribe 時,這時事件還沒有開始發送,是以 subscribeOn() 的線程控制可以從事件發出的開端就造成影響;

observeOn():指定 Subscriber 所運作在的線程。或者叫做事件消費的線程

Observable observable = Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        for (int i = 0; i < students.size(); i++) {
            LogUtil.log(Thread.currentThread().getName());
            subscriber.onNext(students.get(i));
        }
        subscriber.onCompleted();
    }
});
Subscriber<Student> subscriber = new Subscriber<Student>() {
    @Override
    public void onCompleted() {  
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(Student student) {
        LogUtil.log(Thread.currentThread().getName() + ":" + student.getAge());
    }
};
LogUtil.log(Thread.currentThread().getName());
observable
        .observeOn(Schedulers.newThread())
        .subscribe(subscriber);
TimeUnit.SECONDS.sleep(1);
           

輸出:

main

main

main

main

RxNewThreadScheduler-1:18

RxNewThreadScheduler-1:19

RxNewThreadScheduler-1:20

Rxjava學習1 基本概念2 排程器Schedulers

從上圖可以看出,線程切換發生在它内建的 Subscriber 中,即發生在它即将給下一級 Subscriber 發送事件時,是以 observeOn() 控制的是它後面的線程 3 應用場景 3.1 緩存降級 3.2 異步接口

可以實作異步接口調用的方法有很多,比如@Async/Future/CountdownLatch...