天天看點

RxJava入門——概念篇

gitHubd位址: https://github.com/ReactiveX/RxJava 

https://github.com/ReactiveX/RxAndroid 

引入依賴:   compile 'io.reactivex:rxjava:1.0.14'       compile 'io.reactivex:rxandroid:1.0.1'

1、概念 RxJava基本概念:Observable(被觀察者),Observer(觀察者),subscribe(訂閱) ,被觀察者和觀察者通過subscribe方法實作訂閱關系。RxJava的事件回調方法有OnNext(),onCompleted(),onError()。  onCompleted:事件隊列完成是執行。RxJava不僅把每個事件單獨處理,還會把他們看做一個隊列。RxJava規定,當不會有新的onNext()發出時,需要觸發onCompleted()方法作為标志。   onError:事件隊列異常時執行。在事件處理過程中出現異常時onError會被觸發,同時隊列自動終止。不允許在有事件發出。 onCompleted和onError在一個正确運作的隊列中二者是互 斥的,整個隊列的執行隻會調用其中的一個方法。

RxJava入門——概念篇

2、基本實作 (1) 建立觀察者Observer  RxJava中的Observer的接口實作如下

Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) { 
    }

    @Override
    public void onCompleted() { 
    }

    @Override
    public void onError(Throwable e) { 
    }
};
           

除了以上的實作方式之外 RxJava還内置了一個實作了

Observer

 的抽象類:

Subscriber

Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onNext(String s) {
            }

            @Override
            public void onStart() {
                super.onStart();
            }
        };
           

此實作類多了一個onStart()方法 可以用于一些初始化的操作。

(2) 建立被觀察者 Observable

Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("111");
                subscriber.onNext("222");
                subscriber.onNext("333");
                subscriber.onCompleted();
            }
        });
           

Observable.create()是最基本的建立Observable方式,除此之外 還有just()  from()等方式。 在傳回的observable中可以通過isUnsubscribed()來判斷是否已經訂閱  最後通過unsubscribe()來解除訂閱。

(3) 通過subscribe将二者關聯起來

observable.subscribe(observer);
           

最後通過subscribe将oberver和observable進行關聯。 這樣一個訂閱事件就完成了。當 Observable一被訂閱就會觸發observer的call方法。此時 将執行三次onNext()和一次onCompleted()。

3、Scheduler      Scheduler是RxJava的線程排程器。主要有以下幾個:

    Schedulers.immediate():直接在目前線程執行,相當于不指定線程。是預設的Scheduler。

    Schedulers.newThread():總是啟用新線程,并在新線程執行操作。

    Schedulers.io():I/O操作(讀寫檔案 讀寫資料庫 網絡資訊互動) 所使用的Scheduler。行為模式和newThread()差不多,差別在于IO()的内部實作是用在一個無數量上限的線程池,可以重用空閑的線程。是以,多數情況下io()比newThread()更有效率。不要把計算工作放在io()中,可以避免建立不必要的線程。     Schedulers.computation:  計算所使用的 

Scheduler

。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 

Scheduler

 使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在 

computation()

 中,否則 I/O 操作的等待時間會浪費 CPU。

    除此之外,android還有一個專門的AndroidSchedulers.mainThread(),它指定的操作将在Android主線程運作。

有了切換線程的方法後專門使用呢?     RxJava提高了subscribeOn() 和 observeOn() 兩個方法對線程進行控制。

       subscribeOn():指定subscribe所發生的線程,即Observable.OnSubscribe被激活時所處的線程。也叫作時間産生的線程。

        observeOn():指定Subscriber所運作的線程。也叫作事件消費的線程。

String[] names = {"AA", "BB", "CC", "DD", "EE", "FF"};
        Observable.from(names)
                .subscribeOn(Schedulers.io())  // 指定 subscribe() 發生在 IO 線程
                .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.e("lw", "ACtion" + s);
                    }
                });
           

Action1

  這個方法是無參無傳回值的;由于 

onCompleted()

 方法也是無參無傳回值的,是以 

Action1

 可以被當成一個包裝對象,将 

onCompleted()

 的内容打包起來将自己作為一個參數傳入 

subscribe()

 以實作不完整定義的回調。除了Action1之外 RxJava還提供了其他的actionX系列方法(Action0...........Action9)

4、變換  (1)、map:一對一變換        

Observable.just("/image/pp.png")
                .map(new Func1<String, Bitmap>() {
                    @Override
                    public Bitmap call(String s) {


                        return null;
                    }
                })
                .subscribe(new Action1<Bitmap>() {
                    @Override
                    public void call(Bitmap bitmap) {

                    }
                });
           

map将一個字元串轉換成bitMap後傳回了。 這裡出現了一個叫做 

Func1

 的類。它和 

Action1

 非常相似,也是 RxJava 的一個接口,用于包裝含有一個參數的方法。  

Func1

 和 

Action

的差別在于, 

Func1

 包裝的是有傳回值的方法。另外,和 

ActionX

 一樣, 

FuncX

 也有多個,用于不同參數個數的方法。

FuncX

 和

ActionX

 的差別在 

FuncX

 包裝的是有傳回值的方法。 

(2)、flatMap:一對多變換 原理: 1. 使用傳入的事件對象建立一個 

Observable

 對象; 2. 并不發送這個 

Observable

, 而是将它激活,于是它開始發送事件; 3. 每一個建立出來的 

Observable

 發送的事件,都被彙入同一個 

Observable

 ,而這個 

Observable

 負責将這些事件統一交給

Subscriber

 的回調方法。           

List<Student.Course> list = new ArrayList<>();
        Student student = new Student();
        Student.Course c = student.new Course("國文");
        Student.Course c1 = student.new Course("數學");
        Student.Course c2 = student.new Course("英語");

        list.add(c);
        list.add(c1);
        list.add(c2);

        List<Student.Course> list1 = new ArrayList<>();
        Student student11 = new Student();
        Student.Course c3 = student.new Course("實體");
        Student.Course c4 = student.new Course("化學");
        Student.Course c5 = student.new Course("生物");

        list.add(c3);
        list.add(c4);
        list.add(c5);

        Student student1 = new Student(10, "zhangsan", list);
        Student student2 = new Student(10, "zhangsan", list1);
        Student[] students = {student1, student2};

        Subscriber<Student.Course> subscriber1 = new Subscriber<Student.Course>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Student.Course course) {
                Log.e("ssssssss", course.courseName);
            }
        };

        Observable.from(students).flatMap(new Func1<Student, Observable<Student.Course>>() {
            @Override
            public Observable<Student.Course> call(Student student) {
                return Observable.from(student.course);
            }
        }).subscribe(subscriber1);

           

将一組學生中的課程全部列印。