RXjava学习资料:
https://www.gitbook.com/book/yuxingxin/rxjava-essentials-cn/details
如下只是学习笔记而已,后面添加实战案例,现在只是理论总结:
Rxjava语言特点:
1,易于并发从而更好的利用服务器的能力;
2,易于有条件的异步执行;
3,一种更好的方式来避免回调地狱;
4,一种响应式方法。
RXjava源于观察者模式:
添加了如下三个缺少的功能:
1,生产者在没有更多数据可用时能够发出信号通知:oncompleted()事件。
2,生产者在发生错误的时候能够发出信号通知:onError()事件。
3,RxJavaObservables能够组合而不是嵌套,从而避免开发者陷入回调地狱。
RXjava中的四个角色:
1,Observable:观察得到的,看的见得
2,Observer:观察者
3,Subscriber:订阅者
4,Subjects:服从。
Observables和Subjects是两个“生产”实体
Observers和Subscribers是两个“消费”实体。
Rxjava之Observable
onNext(T)检索数据
onError(Throwable)发现错误
onCompleted()完成
RXjava之热Observables和冷Observables
热Observables:只要创建完就发射数据,订阅他的观察者从序列中某位置接受数据。
冷Observables:一直等待,知道观察者订阅他才发射数据,可确保能收到数据序列。
Observale创建的两种方式:
1:create
2:from:从列表,数组来创建Observable,并一个个发射数据
3,just
just()方法可以传入一到九个参数,它们会按照传入的参数的顺序来发射它
们。
无理由不发射数据并结束操作:
Observable.empty(),Observable.never(),和
Observable.throw()
RxJava提供四种不同的Subject:
PublishSubject
BehaviorSubject:向订阅者发送截止订阅前最新的一个数据对象,然后发送订阅后数据流;
ReplaySubject:缓存所订阅的所有数据,向任意一个订阅他的观察者重发数据流;
AsyncSubject:当Observable完成AnsySubject只会发布最后一个数据给已定订阅的每一个观察者。
as中开发使用RXjava,RXandroid:
gradle依赖:
其他推荐的as插件:
1,lombok:getsettostringequal等注解代码
2,butterknife:findviewbyidonclick代码等的注解代码
3,retrolambda:java8lambda相关函数
具体作用情百度或者谷歌进行搜索
just主要是得到原始的observable版本,在一个新的响应式架构的基础上迁移已存在的代码,这个方法可
能是一个有用的开始点。
repeat()重复发数据:
repeat(3):重复发送三次数据;
defer()声明一个Observable但是你又想推迟这个Observable的创建直到观察者订阅时
range()
你需要从一个指定的数字X开始发射N个数字吗?你可以用 range,下面是从10开始,发送3个数字:
interval()
interval() 函数在你需要创建一个轮询程序时非常好用。
如下每隔3秒toast下数据:
timer()
如果你需要一个一段时间之后才发射的Observable,你可以像下面的例子使用timer():
如下3秒后发送数据:
如何过滤数据 Observables?
RxJava让我们使用 filter() 方法来过滤我们观测序列中不想要的值,filter((appInfo) ->appInfo.getName().startsWith("C"))是只想展示c开头的数据。
1,过滤序列:
过滤空数据:
获取开头或者结尾的几个数据:take() 或 takeLast()。
take前几个数据;
takelast后几个数据。
如下是获取前三个数据:
获取后三个数据:
数据去重Distinct:
制造重复数据:
去重:
忽略掉重复的值并且在温度确实改变的时候才得到通知:ditinctuntilchanged()
也就是说有重复数据的时候,利用ditinctuntilchanged可以去重并展示重复数据中的一个数据。
发射第一个数据 first()
发射最后一个数据last()
观测序列完成,不在发射任何值得时候,firstOrDefault(),lastOrDefault()
跳过前两个数据 skip(2)
跳过后两个数据 skipLast(2),也就是跳过后两个的其他数据发射。
发射指定位置的元素:elementAt(2),从零开始,第三个元素发射
指定时间间隔由Observable发射最近一次的数值:
如下每隔三秒,updateDisplay(currentTemperature) 更新当前温度信息
timeout: timeout() 为一个Observable的限时的副本,如果在指定的时间间隔内Observable不发射值的话,它监听的原始的Observable时就会触发 onError() 函
数,也就是限定时间的字段:
下面代码意思就是每隔两秒刷新当前温度,如果不发射数据,就会打印:You should go check the sensor, dude。
Debounce
过滤掉由Observable发射的速率过快的数据;如果在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个。
2016年4月6日23:02:38
转换Observables
Map() :将函数作用于每一个发出去的元素上面去,以此创建一个新的Observable来发射转换的数据。
如下就是将原来的apps的名称小写,并发出去。
FlatMap RxJava的 flatMap() 函数提供一种铺平序列的方式,然后合并这些Observables发射的数据,最后将合并后的结果作为最终的Observable。
ConcatMap:解决了 flatMap() 的交叉问题,提供了一种能够把发射的值连续在一起的铺平函数,而不是合并它们。
FlatMapIterable:flatMapInterable() 和 flatMap() 很像。仅有的本质不同是它将源数据两两结成对并生成Iterable,而不是原始数据项和生成的Observables。
SwitchMap:switchMap() 和 flatMap() 很像,除了一点:每当源Observable发射一个新的数据项(Observable)时,它将取消订阅并停止监视之前那个数据项
产生的Observable,并开始监视当前发射的这一个。
Scan:scan() 函数对原始
Observable发射的每一项数据都应用一个函数,计算出函数的结果值。
groupBy() 来分组元素:
如下按照根据最近更新时间来进行分组。
Buffer RxJava中的 buffer() 函数将源Observable变换一个新的Observable,这个新的Observable每次发射一组列表值而不是一个一个发射。
buffer(count = 3)一组三个的方式进行发射一个列表的obserable。
buffer(count = 2,skip = 2)一组两个,跳过第三个来进行发射一个列表的obserable。
buffer(timespan = 2,count = 2)每隔2秒,每组2个发射一个列表的obserable.
Window:RxJava的 window() 函数和 buffer() 很像,但是它发射的是Observable而不是列表。
window(count = 3)原来observable的自己,每隔obserable三个元素。
window(skip = 3,count = 2),每隔第三个,每隔obserable中有2个元素。
Cast:
map的特殊版本。将原observale每一项转换为新的类型,把他变成不同class.
2016年4月7日22:23:38
Merge:
从单词角度分析,合并,rx中将帮助你把两个甚至更多的Observables合并到他们发射的数据项里。
Zip操作:
zip() 合并两个或者多个Observables发射出的数据项,根据指定的函数 Func* 变换它们,并发射一个新值。
Join操作:
RxJava的 join() 函数基于时间窗口将两个Observables发射的数据结合在一起。
如下意思:tictoc 这个Observable数据每秒只发射一个新的 Long 型整数。
为了合并它们,我们需要指定两个 Func1 变量:
appInfo -> Observable.timer(2, TimeUnit.SECONDS)
time -> Observable.timer(0, TimeUnit.SECONDS)
上面描述了两个时间窗口。下面一行描述我们如何使用 Func2 将两个发射的数据
结合在一起。
this::updateTitle
combineLatest操作:
zip() 作用于最近未打包的两个Observables。相反, combineLatest() 作用于最近发射的数据项:如果 Observable1 发射了A并且 Observable2 发射了B和C, combineLatest() 将会分组处理AB和AC。
如下:
一个是每秒钟从我们已安装的应用列表发射一个App数据,第二个是每隔1.5秒发射一个 Long 型整数。我们将他们结合起来并执行 updateTitle() 函数。
And,Then和When:
我们有两个发射的序列, observableApp ,发射我们安装的应用列表数据, tictoc 每秒发射一个 Long 型整数。现在我们用 and() 连接源
Observable和第二个Observable。
Switch:
subscribe-unsubscribe 的序列里我们能够从一个Observable自动取消订阅来订阅一个新的Observable。
将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项。
StartWith:
startWith() 是 concat() 的对应部分。正如 concat() 向发射数据的Observable追加数据那样,在Observable开始发射他们的数据之前,
startWith() 通过传递一个参数来先发射一个数据序列。
Schedulers调度器
处理多线程异步操作:
RxJava提供了5种调度器:
.io() :IO操作,增长缩减自适应线程池(线程池无限制)
.computation:计算工作默认的调度器,调度器:buffer(),debounce(),delay(),interval(),sample,skip()。
.immediate():允许在当前线程执行你指定的工作,是timeout(),timeInterval(),timestamp()默认调度器。
.newThread():为指定任务启动一个线程。
.trampoline():任务入队列,按需处理队列中的任务,是repeat()和retry()默认调度器。
如下就是简单将对图片文件的IO操作通过RX的io调度器来线程池中进行处理。
在主线程中返回结果。
onBackpressureBuffer() 方法将告诉Observable发射的数据如果比观察者消费的数据要更快的话,它必须把它们存储在缓存中并提供一个合适的时间给它们。
处理耗时的任务: