天天看點

Rx學習

  RXjava學習資料:

  

  https://www.gitbook.com/book/yuxingxin/rxjava-essentials-cn/details

  如下隻是學習筆記而已,後面添加實戰案例,現在隻是理論總結:

  Rxjava語言特點:

  1,易于并發進而更好的利用伺服器的能力;

  2,易于有條件的異步執行;

  3,一種更好的方式來避免回調地獄;

  4,一種響應式方法。

  RXjava源于觀察者模式:

  添加了如下三個缺少的功能:

  1,生産者在沒有更多資料可用時能夠發出信号通知:oncompleted()事件。

  2,生産者在發生錯誤的時候能夠發出信号通知:onError()事件。

  3,RxJavaObservables能夠組合而不是嵌套,進而避免開發者陷入回調地獄。

  RXjava中的四個角色:

  1,Observable:觀察得到的,看的見得

  2,Observer:觀察者

  3,Subscriber:訂閱者

  4,Subjects:服從。

  Observables和Subjects是兩個“生産”實體

  Observers和Subscribers是兩個“消費”實體。

  Rxjava之Observable

  onNext(T)檢索資料

  onError(Throwable)發現錯誤

  onCompleted()完成

  RXjava之熱Observables和冷Observables

  熱Observables:隻要建立完就發射資料,訂閱他的觀察者從序列中某位置接受資料。

  冷Observables:一直等待,知道觀察者訂閱他才發射資料,可確定能收到資料序列。

Observale建立的兩種方式:

1:create

2:from:從清單,數組來建立Observable,并一個個發射資料

  3,just

  just()方法可以傳入一到九個參數,它們會按照傳入的參數的順序來發射它

  們。

  無理由不發射資料并結束操作:

  Observable.empty(),Observable.never(),和

  Observable.throw()

  RxJava提供四種不同的Subject:

  PublishSubject

  BehaviorSubject:向訂閱者發送截止訂閱前最新的一個資料對象,然後發送訂閱後資料流;

  ReplaySubject:緩存所訂閱的所有資料,向任意一個訂閱他的觀察者重發資料流;

  AsyncSubject:當Observable完成AnsySubject隻會釋出最後一個資料給已定訂閱的每一個觀察者。

  as中開發使用RXjava,RXandroid:

  gradle依賴:

  其他推薦的as插件:

  1,lombok:getsettostringequal等注解代碼

  2,butterknife:findviewbyidonclick代碼等的注解代碼

  3,retrolambda:java8lambda相關函數

  具體作用情百度或者谷歌進行搜尋

  just主要是得到原始的observable版本,在一個新的響應式架構的基礎上遷移已存在的代碼,這個方法可

  能是一個有用的開始點。

  repeat()重複發資料:

  repeat(3):重複發送三次資料;

defer()聲明一個Observable但是你又想推遲這個Observable的建立直到觀察者訂閱時

range()

你需要從一個指定的數字X開始發射N個數字嗎?你可以用 range,下面是從10開始,發送3個數字:

interval()

interval() 函數在你需要建立一個輪詢程式時非常好用。

如下每隔3秒toast下資料:

  timer()

  如果你需要一個一段時間之後才發射的Observable,你可以像下面的例子使用timer():

  如下3秒後發送資料:

  如何過濾資料 Observables?

  RxJava讓我們使用 filter() 方法來過濾我們觀測序列中不想要的值,filter((appInfo) ->appInfo.getName().startsWith("C"))是隻想展示c開頭的資料。

1,過濾序列:

  過濾空資料:

  擷取開頭或者結尾的幾個資料:take() 或 takeLast()。

  take前幾個資料;

  takelast後幾個資料。

  如下是擷取前三個資料:

  擷取後三個資料:

  資料去重Distinct:

  制造重複資料:

  去重:

 忽略掉重複的值并且在溫度确實改變的時候才得到通知:ditinctuntilchanged()

也就是說有重複資料的時候,利用ditinctuntilchanged可以去重并展示重複資料中的一個資料。

發射第一個資料 first()

發射最後一個資料last()

觀測序列完成,不在發射任何值得時候,firstOrDefault(),lastOrDefault()

跳過前兩個資料 skip(2)

跳過後兩個資料 skipLast(2),也就是跳過後兩個的其他資料發射。

發射指定位置的元素:elementAt(2),從零開始,第三個元素發射

指定時間間隔由Observable發射最近一次的數值:

如下每隔三秒,updateDisplay(currentTemperature) 更新目前溫度資訊

 timeout:  timeout() 為一個Observable的限時的副本,如果在指定的時間間隔内Observable不發射值的話,它監聽的原始的Observable時就會觸發 onError() 函

數,也就是限定時間的字段:

下面代碼意思就是每隔兩秒重新整理目前溫度,如果不發射資料,就會列印:You should go check the sensor, dude。

Debounce

過濾掉由Observable發射的速率過快的資料;如果在一個指定的時間間隔過去了仍舊沒有發射一個,那麼它将發射最後的那個。

 2016年4月6日23:02:38

轉換Observables

Map() :将函數作用于每一個發出去的元素上面去,以此建立一個新的Observable來發射轉換的資料。

如下就是将原來的apps的名稱小寫,并發出去。

FlatMap RxJava的 flatMap() 函數提供一種鋪平序列的方式,然後合并這些Observables發射的資料,最後将合并後的結果作為最終的Observable。

ConcatMap:解決了 flatMap() 的交叉問題,提供了一種能夠把發射的值連續在一起的鋪平函數,而不是合并它們。

FlatMapIterable:flatMapInterable() 和 flatMap() 很像。僅有的本質不同是它将源資料兩兩結成對并生成Iterable,而不是原始資料項和生成的Observables。

SwitchMap:switchMap() 和 flatMap() 很像,除了一點:每當源Observable發射一個新的資料項(Observable)時,它将取消訂閱并停止監視之前那個資料項

産生的Observable,并開始監視目前發射的這一個。

Scan:scan() 函數對原始

Observable發射的每一項資料都應用一個函數,計算出函數的結果值。

groupBy()  來分組元素:

如下按照根據最近更新時間來進行分組。

  Buffer RxJava中的 buffer() 函數将源Observable變換一個新的Observable,這個新的Observable每次發射一組清單值而不是一個一個發射。

  buffer(count = 3)一組三個的方式進行發射一個清單的obserable。

  buffer(count = 2,skip = 2)一組兩個,跳過第三個來進行發射一個清單的obserable。

  buffer(timespan = 2,count = 2)每隔2秒,每組2個發射一個清單的obserable.

  Window:RxJava的 window() 函數和 buffer() 很像,但是它發射的是Observable而不是清單。

  window(count = 3)原來observable的自己,每隔obserable三個元素。

  window(skip = 3,count = 2),每隔第三個,每隔obserable中有2個元素。

  Cast:

  map的特殊版本。将原observale每一項轉換為新的類型,把他變成不同class.

 2016年4月7日22:23:38

 Merge:

從單詞角度分析,合并,rx中将幫助你把兩個甚至更多的Observables合并到他們發射的資料項裡。

Zip操作:

zip() 合并兩個或者多個Observables發射出的資料項,根據指定的函數 Func* 變換它們,并發射一個新值。

  Join操作:

  RxJava的 join() 函數基于時間視窗将兩個Observables發射的資料結合在一起。

  如下意思:tictoc 這個Observable資料每秒隻發射一個新的 Long 型整數。

  為了合并它們,我們需要指定兩個 Func1 變量:

  appInfo -> Observable.timer(2, TimeUnit.SECONDS)

  time -> Observable.timer(0, TimeUnit.SECONDS)

  上面描述了兩個時間視窗。下面一行描述我們如何使用 Func2 将兩個發射的資料

  結合在一起。

  this::updateTitle

  combineLatest操作:

  zip() 作用于最近未打包的兩個Observables。相反, combineLatest() 作用于最近發射的資料項:如果 Observable1 發射了A并且 Observable2 發射了B和C, combineLatest() 将會分組處理AB和AC。

  如下:

  一個是每秒鐘從我們已安裝的應用清單發射一個App資料,第二個是每隔1.5秒發射一個 Long 型整數。我們将他們結合起來并執行 updateTitle() 函數。

And,Then和When:

  我們有兩個發射的序列, observableApp ,發射我們安裝的應用清單資料, tictoc 每秒發射一個 Long 型整數。現在我們用 and() 連接配接源

  Observable和第二個Observable。

Switch:

  subscribe-unsubscribe 的序列裡我們能夠從一個Observable自動取消訂閱來訂閱一個新的Observable。

  将一個發射多個Observables的Observable轉換成另一個單獨的Observable,後者發射那些Observables最近發射的資料項。

StartWith:

  startWith() 是 concat() 的對應部分。正如 concat() 向發射資料的Observable追加資料那樣,在Observable開始發射他們的資料之前,

  startWith() 通過傳遞一個參數來先發射一個資料序列。

Schedulers排程器

 處理多線程異步操作:

  RxJava提供了5種排程器:

  .io() :IO操作,增長縮減自适應線程池(線程池無限制)

  .computation:計算工作預設的排程器,排程器:buffer(),debounce(),delay(),interval(),sample,skip()。

  .immediate():允許在目前線程執行你指定的工作,是timeout(),timeInterval(),timestamp()預設排程器。

  .newThread():為指定任務啟動一個線程。

  .trampoline():任務入隊列,按需處理隊列中的任務,是repeat()和retry()預設排程器。

如下就是簡單将對圖檔檔案的IO操作通過RX的io排程器來線程池中進行處理。

在主線程中傳回結果。

onBackpressureBuffer()  方法将告訴Observable發射的資料如果比觀察者消費的資料要更快的話,它必須把它們存儲在緩存中并提供一個合适的時間給它們。

處理耗時的任務: