天天看点

ReactiveX Observable规范

Observable规范(Observable Contract):

http://reactivex.io/documentation/contract.html

“The Observable Contract”, 在ReactiveX的官方文档,网站以及源码注释上经常出现。是基于微软在2010发表的 Rx Design Guidelines(用于描述Rx.Net是如何实现ReactiveX的),对Observable机制进行的规范化尝试。下文基本是全文翻译,稍微插一点我自己的理解。

  1. 注: 下面描述的 “通信”, “发出”, “收到”, “Notification” 等不代表传统意义上的信息收发,而是在抽象层面的描述,在实现层面可以理解为一个对象调用另外一个对象的方法。
  2. 注: “Item”代表Observable向observer发送的一条信息(OnNext/OnError/OnCompleted)。
  3. 注: “Subscription” 是对observer subscribe Observable这一订阅关系的表述。
  4. Notifications, 代表抽象概念上,对象之间通信的“信息”。
    1. Observable向observer发送的Notification:
      1. OnNext: 表示Observable向observer发送了一条信息。
      2. OnCompleted: 表示Observable圆满结束发送任务,不会再发送信息。
      3. OnError: 表示Observable因为某个错误被终止,不会再发送信息。
      4. OnSubscribe (可选): 表明Observable开始准备接收来自observer的Request notifications(见下面的Backpressure介绍)
    2. observer可以向Observable发送下列Notification进行通信:
      1. Subscribe:表示observer开始接收来自Observable的notifications.
      2. Unsubscribe: 表示observer不再接收来自Observable的notifications.
      3. Request (可选): 表示observer向Observable请求不多于一定数量的附加性onNext notification.(见下面的Backpressure介绍)
  5. Notifications准则:
    1. Observable可以发出>=0个OnNext, 每次代表一个独立的item发出。
    2. 发送序列最后接一个*OnCompleted或OnError, 但是两者是互斥出现*的。
    3. 一旦发出了OnCompleted或者OnError, 后续不再发送任何Notification。
    4. 允许Observale**不发出任何Notification**。
    5. 允许Observable永远不发出OnCompleted或OnError, 永不结束。
    6. Observable可以自始至终只发一个OnCompleted或OnError。
    7. 注: 上述对Observable的特殊情况描述在RxJava中都有对应的Observable扩展类。
    8. Observable必须线性的向observers发送Notification, 禁止并行发送。比如,来自不同线程的Notification, 在投递时,Notification之间必须有明确的可以界定的先后关系(比如两个线程T1, T2都会调OnNext, 那么T1和T2调OnNext的过程中不能产生交织,比如T1先执行OnNext,在执行过程中,即使切换到了T2,T2也需要等待T1的OnNext完整执行以后才能执行自己的OnNext,其实描述的并不精确,只是为了好理解)。
  6. Observable结束:
    1. 只要Observable没有发出OnCompleted或OnError, 从observer角度看,这个Observable仍然是“活”的(即使它现在并没有发送Notification),可以继续和该Observable进行通信(比如Unsubscribe或Request)。
    2. Observable发出OnCompleted或OnError代表Observable的结束,不能再通信。
    3. OnError必须携带错误的原因描述(这意味着OnError回传null是非法的)。
    4. Observable结束前必须保证向其observers发送了OnCompleted/OnError。
  7. 订阅(Subscribing)和注销(Unsubscribing):
    1. Observable在收到observer的Subscribe后就可以立即向observer发送Notification.
    2. observer 向Observable发出Unsubscribe后, Observable会尝试停止向observer发送Notification。但是不保证Observable不会再向observer发送Notification(注:因此observer需要自己处理这种情况)
    3. Observable发送OnError/OnCompleted**代表着Subscription的结束**。这种情况下,observer不需要再向Observable发送UnSubscribe来结束Subscription。
  8. 多重Observer
    1. 假设有两个observer A, B. A先subscribe了Observable,Observable发送了复数个Notification给A,然后B subscribe了 Observable。
    2. 在B subscribe 时,取决于Observable的具体策略或者实现,可能有如下的行为:
      1. Observable在下一次发送Notification时同时发给A和B。
      2. Observable将B subscribe之前发出的Notification replay给B。
      3. Observable会向B发送次序不同于A的Notification序列。
      4. 等等其他。
    3. 不保证subscribe同一个Observable的复数个observer可以收到相同次序的Notification序列
  9. Backpressure(背压: refers to pressure opposed to the desired flow of gasses in confined places such as a pipe)
    1. Backpressure是可选的,并非所有的ReaciveX实现都包含了Backpressure,但是能实现当然是更好的。
    2. observer实现了Request Notification,并能理解和响应OnSubscribe Notification,是Observable实现Backpressure的先决条件。
    3. observer收到OnSubscribe Notification时会向其subscribe的Observable发生一个Request Notification。该Notification表示请求一定数量的item。Observable应该回复给observer不大于该数量的item。如果有需要的话,Observable可以在后面附加一个OnCompleted/OnError, 甚至在observer 开始request前,都可以发出OnCompleted/OnError.
    4. 如果Observable不支持Backpressure,其在收到observer的Request时,应该回复一个OnError表明其不支持。
    5. Requests可以累加。比如,observer 向Observable发送了3次Request,分别要求3,5,10个item,那么最终总计Observable会向observer发送18个item。
    6. 如果Observable产出了大于observer要求数量的item,如何处理多余的item取决于Observable,可以丢弃,暂存或者其他处理方式。

继续阅读