天天看點

RxJava學習筆記Rxjava 的原理

Rxjava 的原理

  1. Observable:發射源,可觀察的,在官擦着模式中被稱為”被觀察者
  2. Observer:接收源,觀察者,可接受Observable,subject發送的資料
  3. Subject:即可當接受者,也可以當被觀察者
  4. Subscriber: 訂閱者,接收源,實作了Observer接口,比Observer多了一個接口unSubscribe取消訂閱,建議使用Subscriber接收源.
  5. Subscription: Observable 調用subscribe() 方法傳回的對象,同樣有unsbscribe()方法,可以用來取消訂閱事件.
  6. Action0: Rxjava 中的一個接口,他隻有一個無參call()方法,且無傳回值,同樣還有Action1(),Action2(),,,封裝了一個參數,兩個參數,一直到9個參數的
  7. Fun0(): 于Action0相同,隻是這個有傳回值的,相同的也是從0-9 依次帶參數;

Observable的使用

  • 使用create建立(基本不用)
Observable normalObservable = Observable.create(new Observable.OnSubscribe<String>() {
        @Overridepublic void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("create1"); //發射一個"create1"的String
            subscriber.onNext("create2"); //發射一個"create2"的String
            subscriber.onCompleted();//發射完成,這種方法需要手動調用onCompleted,才會回調Observer的onCompleted方法
        }});
           
  • 使用 just() 将為你建立一個Observable并自動為你調用onNext( )發射資料
  • 使用from周遊集合,依次發送每個item
List<String> list = new ArrayList<>();
list.add("from1");
list.add("from2");
list.add("from3");
fromObservable = Observable.from(list);  //周遊list 每次發送一個
/** 注意,just()方法也可以傳list,但是發送的是整個list對象,而from()發送的是list的一個item** /
           
  • 使用defer() 有觀察者的時候才建立被觀察者,并且每次都建立一個新的Obsevable對象
deferObservable = Observable.defer(new Func0<Observable<String>>() {
  @Override
  //注意此處的call方法沒有Subscriber參數public Observable<String> call() {
      return Observable.just("deferObservable");
  }});
           
  • 使用interval() 設定每隔多長時間請求一次,通常用于計時器,從1忘上數的即時,如果想要倒數,注意線程的切換,使用map從60-i,即可
intervalObservable = Observable.interval(, TimeUnit.SECONDS);//每隔一秒發送一次
//interval()是運作在computation Scheduler線程中的,是以需要轉到主線程
mSubscription=Observable
        .interval(, TimeUnit.SECONDS)
        .observeOn(AndroidSchedulers.mainThread())
        .map(new Func1<Long, Long>() { //使用map将資料轉化

            @Override
            public Long call(Long aLong) {
                return  - aLong;
            }
        })
          .subscribe(new Action1<Long>() {
              @Override
              public void call(Long aLong) {
                  mText.setText(aLong+"S");
              }
          });
           
  • 使用range()建立一個發送特定整數數列的Observable,第一個參數為起始值,第二個參數為發送的長度,如果為0不發送,如果為負抛出異常;
  • 使用timer(),建立一個Observable,給定一個延遲後發射一條資料,等同于Android中的Handler的postDely()方法,注意線程的切換,這個是運作咋子線程的
  • 使用repeat()建立一個重複發送特定資料的ObserVable

Observer的建立

1.普通接受,基本不用

mObserver = new Observer<String>() {
  @Overridepublic void onCompleted() {
      LogUtil.log("onCompleted"); // 全部列印時候調用
  }
  @Overridepublic void onError(Throwable e) {
  }
  @Overridepublic void onNext(String s) {
      LogUtil.log(s); // 依次列印資料先OnNext的先列印
  }};
           
  1. 可以使用subscribe中的Action1接受傳入
Subscribe的源碼
//一個參數的Action
public final Subscription subscribe(final Action1<? super T> onNext)
//帶有錯誤的參數
public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError)
//帶有完成的參數
public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted)
//示例
Observable.just(1,2,3,4).subscribe(new Action1<Long>() {
               @Override
               public void call(Long aLong) {
                   mText.setText(aLong + "S");
               }
           },
        new Action1<Throwable>() {


            @Override
            public void call(Throwable e) {
                //錯誤的方法
                e.printStackTrace();

            }
        }, new Action0() {

            @Override
            public void call() {
                //這個是全部完成的方法
                System.out.println("我是全部完成的");

            }
        }
);
           

Subject

定義

  • Subject可以看成是一個橋梁或者代理,在某些ReactiveX實作中(如RxJava),它同時充當了Observer和Observable的角色。因為它是一個Observer,它可以訂閱一個或多個Observable;又因為它是一個Observable,它可以轉發它收到(Observe)的資料,也可以發射新的資料.
    1. 它可以充當Observable;
    2. 它可以充當Observer;
    3. 它是Observable和Observer之間的橋梁;
  • Subject是一個抽象類,不能通過new來執行個體化Subject,是以Subject有四個實作類,分别為AsyncSubject、BehaviorSubject、PublishSubject和ReplaySubject,每個實作類都有特定的“技能”,下面結合代碼來介紹一下它們各自的“技能”。注意,所有的實作類都由create()方法執行個體化,無需new,所有的實作類調用onCompleted()或onError(),它的Observer将不再接收資料;
  • AsyncSubject:Observer會接收AsyncSubject的`onComplete()之前的最後一個資料,如果因異常而終止,AsyncSubject将不會釋放任何資料,但是會向Observer傳遞一個異常通知.
//隻會接受onCompleted之前的第一個onNext的值;如果不調用onCompleted(),Subscriber将不接收任何資料。
AsyncSubject<String> strAsyncSubject = AsyncSubject.create();
strAsyncSubject.onNext("我");
strAsyncSubject.onNext("愛");
strAsyncSubject.onNext("你");
strAsyncSubject.onCompleted();
strAsyncSubject.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {

        // System.out: 你
        System.out.println(s);
    }
});
           
  • BehaviorSubject:

    Observer會接收到BehaviorSubject被訂閱之前的最後一個資料,再接收其他發射過來的資料,如果BehaviorSubject被訂閱之前沒有發送任何資料,則會發送一個預設資料。(注意跟AsyncSubject的差別,AsyncSubject要手動調用onCompleted(),且它的Observer會接收到onCompleted()前發送的最後一個資料,之後不會再接收資料,而BehaviorSubject不需手動調用onCompleted(),它的Observer接收的是BehaviorSubject被訂閱前發送的最後一個資料,兩個的分界點不一樣,且之後還會繼續接收資料。)示例代碼如下:

BehaviorSubject<String> strBehaviorSubject = BehaviorSubject.create("夢");
strBehaviorSubject.onNext("我");
strBehaviorSubject.onNext("愛");
//此時被訂閱會找到上一個onNext的值:愛, 如果沒有onNext會發送一個預設值create裡面的:夢
strBehaviorSubject.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        ///System.out: 愛
        //System.out: 你
        System.out.println(s);
    }
});

strBehaviorSubject.onNext("你");
           
  • PublishSubject: 它的Observer隻會接收到PublishSubject被訂閱之後發送的資料。
PublishSubject<String> strPublishSubject = PublishSubject.create();
strPublishSubject.onNext("我");
strPublishSubject.onNext("愛");
strPublishSubject.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {

        //System.out: 你
        //System.out: !
        System.out.println(s);
    }
});

strPublishSubject.onNext("你");
strPublishSubject.onNext("!");
           
  • ReplaySubject

    ReplaySubject會發射所有資料給觀察者,無論它們是何時訂閱的。也有其它版本的ReplaySubject,在重放緩存增長到一定大小的時候或過了一段時間後會丢棄舊的資料。

//建立預設初始緩存容量大小為的ReplaySubject,當資料條目超過會重新配置設定記憶體空間,使用這種方式,不論ReplaySubject何時被訂閱,Observer都能接收到資料
ReplaySubject<String> replaySubject = ReplaySubject.create();
// 建立指定初始緩存容量大小為的ReplaySubject
//replaySubject = ReplaySubject.create();
// 隻緩存訂閱前最後發送的條資料
//replaySubject = ReplaySubject.createWithSize();
// replaySubject被訂閱前的前秒内發送的資料才能被接收
//replaySubject=ReplaySubject.createWithTime(,TimeUnit.SECONDS,Schedulers.computation());
replaySubject.onNext("replaySubject:pre1");
replaySubject.onNext("replaySubject:pre2");
replaySubject.onNext("replaySubject:pre3");
replaySubject.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        //由上到下依次列印資料
        System.out.println(s);
    }
});
replaySubject.onNext("replaySubject:after1");
replaySubject.onNext("replaySubject:after2");
           
  • SerializedSubject

    Subject 當作一個 Subscriber 使用,不要從多個線程中調用它的onNext方法(包括其它的on系列方法),這可能導緻同時(非順序)調用,這會違反Observable協定,給Subject的結果增加了不确定性。要避免此類問題,官方提出了“串行化”,你可以将 Subject 轉換為一個 SerializedSubject

SerializedSubject<String, Integer> ser = new SerializedSubject(publishSubject);
           
  • subject:Subject繼承了Observable,又實作了Observer接口,是以說它既是Observable又是Observer,是以Subject也能實作Observable和Observer相同的功能
//使用subject實作訂閱跟發送
PublishSubject<String> publishSubject = PublishSubject.create();
Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {

        subscriber.onNext("as Bridge");
        subscriber.onNext("as Bridge");
        subscriber.onNext("as Bridge");
        subscriber.onCompleted();
    }
}).subscribe(publishSubject);

publishSubject.subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

        System.out.println("我是錯誤的");
        e.printStackTrace();
    }

    @Override
    public void onNext(String s) {

        System.out.println("woshi" + s);
    }
});
           

Scheduler:排程器

RxJava學習筆記Rxjava 的原理
RxJava學習筆記Rxjava 的原理

注意SubscribeOn隻能調取一次,以第一次為準,并使其上的代碼運作在其指定的線程内,下面如果再次調用将沒有效果,ObserveOn其下面的代碼運作在其指定的線程内,注意一些耗時操作的操作符需要更改線程,切記切記切記,重要的事情講三遍

思考:我們經常多次使用subscribeOn切換線程,那麼以後是否可以組合observeOn和subscribeOn達到自由切換的目的呢?

  1. 組合是可以的,但是他們的執行順序是有條件的,如果仔細分析的話,可以知道observeOn調用之後,再調用subscribeOn是無效的,原因是什麼?
  2. 因為subscribeOn改變的是subscribe這句調用所在的線程,大多數情況,産生内容和消費内容是在同一線程的,是以改變了産生内容所在的線程,就改變了消費内容所在的線程。
  3. 經過上面的闡述,我們知道,observeOn的工作原理是把消費結果先緩存,再切換到新線程上讓原始消費者消費,它和生産者是沒有一點關系的,就算subscribeOn調用了,也隻是改變observeOn這個消費者所在的線程,和OperatorObserveOn中存儲的原始消費者一點關系都沒有,它還是由observeOn控制。
  4. 下面提到的“操作”包括産生事件、用操作符操作事件以及最終的通過 subscriber 消費事件;
    1. 隻有第一subscribeOn() 起作用(是以多個 subscribeOn() 無意義);
    2. 這個 subscribeOn() 控制從流程開始的第一個操作,直到遇到第一個 observeOn();
    3. observeOn() 可以使用多次,每個 observeOn() 将導緻一次線程切換(),這次切換開始于這次 observeOn() 的下一個操作;
    4. 不論是 subscribeOn() 還是 observeOn(),每次線程切換如果不受到下一個 observeOn() 的幹預,線程将不再改變,不會自動切換到其他線程。

常用的操作符

*Map:最常用且最實用的操作符之一,将對象轉換成另一個對象發射出去,應用範圍非常廣,如資料的轉換,資料的預處理等

Observable.just("12345678").map(new Func1<String, String>() {
  @Override
  public String call(String s) {
      return s.substring(,);//隻要前四位
  }})
.subscribe(new Action1<String>() {
  @Override
  public void call(String s) {
      Log.i("mytag",s);
  }});
           
  • FlatMap:和Map很像但又有所差別,Map隻是轉換發射的資料類型,而FlatMap可以将原始Observable轉換成另一個Observable。
Observable.from(schoolList).flatMap(new Func1<School, Observable<School.Student>>() {
  @Override
  public Observable<School.Student> call(School school) {

      return Observable.from(school.getStudentList()); //關鍵,将學生清單以另外一個Observable發射出去

  }}).subscribe(new Action1<School.Student>() {

  @Override
  public void call(School.Student student) {
      Log.i("mytag",student.getName());
  }});
           
  • Buffer :緩存,可以設定緩存大小,緩存滿了,以List方式将資料發送,通常跟Map一起使用:注意線程的排程
List<School> schoolList = new ArrayList<>();
Observable.from(schoolList).map(new Func1<School, School>() {
  @Overridepublic School call(School school) {
      school.setName("NB大學");  //将所有學校改名return school;
  }}).buffer(schoolList.size())  //緩存起來,最後一起發送
.subscribe(new Action1<List<School>>() {
  @Overridepublic void call(List<School> schools) {   
}});
           
  • Take:發送前n項資料
Observable.from(schoolList).take().map(new Func1<School, School>() {
  @Override
  public School call(School school) {
      school.setName("NB大學");
      return school;
  }}).buffer().subscribe(new Action1<List<School>>() {
  @Override
  public void call(List<School> schools) {
  }});
           
  • Distinct:去掉重複的項
Observable.just(, , , , , )
      .distinct()
      .subscribe(new Action1<Integer>() {
          @Override
          public void call(Integer item) {
              System.out.println("Next: " + item);
          }
      });
           
  • Filter: 過濾
Observable.just(, , , , )
      .filter(new Func1<Integer, Boolean>() {
          @Override
          public Boolean call(Integer item) {
              return( item <  );
          }
      }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer item) {
              System.out.println("Next: " + item);
    }});
           
  • marge兩個任務并發進行,全部處理完畢之後在更新資料
Observable obs1=Observable.create(new Observable.OnSubscribe<String>(){

    @Override
    public void call(Subscriber<? super String> subscriber) {
        try {
            Thread.sleep();
            subscriber.onNext(" aaa");
            subscriber.onCompleted();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}).subscribeOn(Schedulers.newThread());

Observable obs2=Observable.create(new Observable.OnSubscribe<String>(){

    @Override
    public void call(Subscriber<? super String> subscriber) {
        try {
            Thread.sleep();
            subscriber.onNext("bbb");
            subscriber.onCompleted();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}).subscribeOn(Schedulers.newThread());

Observable.merge(obs1,obs2)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<String>() {
            StringBuffer sb=new StringBuffer();
            @Override
            public void onCompleted() {
                mText.append("兩個任務都處理完畢!!\n");
                mText.append("更新資料:"+sb+"\n");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                sb.append( s+",");
                mText.append("得到一個資料:"+s+"\n");
            }
        });
           
  • RxBinding跟Rxjava相結合
RxTextView.textChanges(mEdit)
        //在一次事件發生後的一段時間内沒有新操作,則發出這次事件
        .debounce(,TimeUnit.MILLISECONDS)
        //轉換線程
        .observeOn(Schedulers.newThread())
        //通過輸入的資料,來比對"資料庫"中的資料進而提示。。
        .map(new Func1<CharSequence, List<String>>() {
            List<String> list=new ArrayList<String>();
            @Override
            public List<String> call(CharSequence charSequence) {

                if (charSequence.toString().contains("1")){
                    for (int i=;i<;i++){
                        list.add("11"+i);
                    }
                }
                return list;
            }
        })
        //由于我不想要listl清單,是以使用了flatMap來分解成一個一個的資料發送
        .flatMap(new Func1<List<String>, Observable<String>>() {
            @Override
            public Observable<String> call(List<String> strings) {

                return Observable.from(strings);
            }
        })
        //這裡切換成主線程,不然沒法操作元件
        .observeOn(AndroidSchedulers.mainThread())
        //這裡做一些過濾動作
        .filter(new Func1<String, Boolean>() {
            @Override
            public Boolean call(String s) {
                return !mText.getText().toString().contains(s);
            }
        })
        //訂閱
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                //這裡展示提示資料
                mText.append(s + "\n");
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                Log.w("DDDDDDDD",throwable.getMessage().toString());
            }
        });
mBtn.setText("連續點選防誤觸");
RxView.clicks(mBtn)
        //防誤觸(設定點選後500ms内丢棄新事件,或者說點選後500ms毫秒無響應)
        .throttleFirst(, TimeUnit.MILLISECONDS)
        .subscribe(new Action1<Void>() {
            //這就相當于OnClickListener中的OnClick方法回調
            @Override
            public void call(Void aVoid) {
               mText.append("\n 防誤觸 測試  \n");
            }
        });
           

*doOnNext:每次觀察者中的onNext調用之前調用

//        Integer [] number={1,2,3,4,5,6,7,8,9,10};
        Observable.from(number)
                  .filter(new Func1<Integer, Boolean>() {
                      @Override
                      public Boolean call(Integer integer) {
                          return integer%!=;
                      }
                  })
                    //取前四個
                    .take()
                    //取前四個中的後兩個
                    .takeLast()
                    .doOnNext(new Action1<Integer>() { //每個onNext之前調用
                        @Override
                        public void call(Integer integer) {
                            mText.append("before onNext()\n");
                        }
                    })
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            mText.append("onNext()--->"+integer+"\n");
                        }
                    });

                    //system:before onNext()
                    //onNext()--->5
                    //system:before onNext()
                    //onNext()--->7
           

*toSortedList() :為事件中的資料排序

Integer [] words={,,,,,,,,,};
Observable.from(words)
          .toSortedList()
           .flatMap(new Func1<List<Integer>, Observable<Integer>>() {
               @Override
               public Observable<Integer> call(List<Integer> strings) {
                   return Observable.from(strings);
               }
           })
          .subscribe(new Action1<Integer>() {
              @Override
              public void call(Integer strings) {
                  mText.append(strings+"\n");
              }
          });
           
  • connect模式: Observable發送事件1-6,兩個觀察者同時觀察這個Observable \n要求:每發出一個事件,觀察者A和觀察者都會收到,而不是先把所有的時間發送A,然後再發送給B
//        Integer [] integer={1,2,3,4,5,6};
        //正常模式 A先接受完數組在給B接受
        Observable  observable= Observable.from(integer);
        Action1 a1=new Action1<Integer>(){
            @Override
            public void call(Integer o) {
                mText.append("觀察者A  收到:  "+o+"\n");
            }
        };
        Action1 a2=new Action1<Integer>(){
            @Override
            public void call(Integer o) {
                mText.append("觀察者B  收到:  "+o+"\n");
            }
        };

        observable.subscribe(a1);
        observable.subscribe(a2);
//connect模式下A跟B一起收模式為1,1,2,2,3,3...就是A,B,A,B...
ConnectableObservable  observable= Observable.from(integer)
                                            .publish();//将一個Observable轉換為一個可連接配接的Observable

Action1 a1=new Action1<Integer>(){
    @Override
    public void call(Integer o) {
        mText.append("觀察者A  收到:  "+o+"\n");
    }
};
Action1 a2=new Action1<Integer>(){
    @Override
    public void call(Integer o) {
        mText.append("觀察者B  收到:  "+o+"\n");
    }
};

observable.subscribe(a1);
observable.subscribe(a2);
observable.connect();
           
  • timestamp() :為每個事件加上一個時間戳
Integer [] words={,,,,,,,,,};
        Observable.from(words)
                .timestamp()
//                .timestamp(Schedulers.io()) 可指定線程環境,如果指定到子線程,請在最後切換成主線程
                .subscribe(new Action1<Timestamped<Integer>>() {
                    @Override
                    public void call(Timestamped<Integer> integerTimestamped) {

                        SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");
                        mText.append("value: "+integerTimestamped.getValue()+"       time:   ");
                        mText.append(sdf.format(new Date(integerTimestamped.getTimestampMillis()))+"\n");

                    }
                });