天天看点

RxSwift操作符详解共演示了大概60个操作符的使用- Creating Operators- Combining Operators- Filtering Operators- Transforming Operators- Convert Operator- Mathematical and Aggregate Operators- Connectable Operators- Error Handling Operators- Conditional and Boolean Operators

共演示了大概60个操作符的使用

Demo 项目地址

- Creating Operators

never

创建既不发射数据也不会终止的序列

Observable<String>.never()
            .subscribe{
                print($)
            }.disposed(by: GlobalDisposeBag)
           
无任何输出
           

empty

创建不发射数据但会正常终止的序列

Observable<String>.empty()
            .subscribe{
                print($)
            }.disposed(by: GlobalDisposeBag)
           
输出:
completed
           

error

创建不发射数据但异常终止的序列

Observable<String>.error(ExampleError)
            .subscribe{
                print($)
            }.disposed(by: GlobalDisposeBag)
           
输出:
error(Error Domain=RxExampleError Code= "(null)" UserInfo={msg=这是一个示例错误})
           

just

创建只发射一个数据的序列,然后序列正常终止

Observable<String>.just("onlyMe")
            .subscribe{
                print($)
            }.disposed(by: GlobalDisposeBag)
           
输出:
next(onlyMe)
completed
           

of

创建发射若干数据的序列,然后序列正常终止

Observable<String>.of("a","b","c")
            .subscribe{
                print($)
            }.disposed(by: GlobalDisposeBag)
           
输出:
next(a)
next(b)
next(c)
completed
           

from

将其他’集合类型’转换为序列,序列不会终止。

Observable<String>.from(["1", "2", "3", "4"])
            .subscribe(onNext: { print($) })
            .disposed(by: GlobalDisposeBag)
           
输出:
next(a)
next(b)
next(c)
completed
           

create

自定义一个序列,此处Demo中的发射逻辑是:每隔一秒按顺序发射指定数组中的一个元素,直至数组中所有元素全部被发射,然后终止序列。

let arr = [,,,]
        Observable<Int>.create { observer in
            var index = 
            Timer.scheduledTimer(withTimeInterval: , repeats: true, block: { (timer) in
                if index >= arr.count{
                    observer.onCompleted()
                    timer.invalidate()
                }else{
                    observer.onNext(arr[index])
                }
                index = index + 
            })
            return Disposables.create()
            }
            .subscribe { print($) }
            .disposed(by: GlobalDisposeBag)
           
输出:
next()
next()
next()
next()
completed
           

range

创建一个发射指定范围整数的序列。

注:这里的 Observable 必须是 Observable, 因为该操作符是用于整型数的

Observable<Int>.range(start: , count: )
            .subscribe { print($) }
            .disposed(by: GlobalDisposeBag)
           
输出:
next()
next()
next()
next()
next()
completed
           

generate

创建一个序列,该序列会根据初始值和迭代逻辑产生数据并发射,当数据满足条件时则会发射数据,如果不满足,则序列中止。迭代结果会作为下一次迭代的输入。

Observable.generate(
            initialState: ,
            condition: { $0 <  },
            iterate: { $0 +  }
            )
            .subscribe { print($0) }
            .disposed(by: GlobalDisposeBag)
           
输出:
next()
next()
next()
completed
           

defer

延迟创建序列操作符。该操作符会接收一个序列工厂函数,当订阅发生时,该序列才会被真正的创建,并且其会为每个订阅创建序列。这里需要注意 defer 延迟的是 Observable.create (创建序列)这个动作,而不是 Observable.create 这个方法传入的闭包的执行时机,Observable.create 传入的闭包参数是一个subscription,其默认就是在订阅时执行的。

var seqFlag = 
        let deferredSequence = Observable<String>.deferred {
            seqFlag += 
            print("创建序列 seqFlag:\(seqFlag)")
            return Observable.create { observer in
                observer.onNext(String(seqFlag) + "a")
                observer.onNext(String(seqFlag) + "b")
                observer.onCompleted()
                return Disposables.create()
            }
        }
        print("订阅1")
        deferredSequence
            .subscribe {print($)}
            .disposed(by: GlobalDisposeBag)

        print("订阅2")
        deferredSequence
            .subscribe {print($)}
            .disposed(by: GlobalDisposeBag)
           
输出:
订阅
创建序列 seqFlag:
next(a)
next(b)
completed
订阅
创建序列 seqFlag:
next(a)
next(b)
completed
           

interval

创建根据指定间隔时间无限发射递增整数数据的序列。

Observable<Int>.interval(, scheduler: MainScheduler.asyncInstance)
            .subscribe{print($)}
            .disposed(by: GlobalDisposeBag)
           
输出:
next()
next()
next()
next()
next()
next()
.............

           

timer

创建一个序列,它会在指定延迟后根据指定的时间间隔无限发射递增整数数据。

let seq = Observable<Int>.timer(, period: ,scheduler: MainScheduler.asyncInstance)
print("订阅 time:",Date())
seq.subscribe{print("收到事件:",$,"time:",Date())}
    .disposed(by: GlobalDisposeBag)
           
输出:
订阅 time: 2018-03-23 03:06:49 +0000
收到事件: next(0) time: 2018-03-23 03:06:51 +0000
收到事件: next(1) time: 2018-03-23 03:06:52 +0000
收到事件: next(2) time: 2018-03-23 03:06:53 +0000
收到事件: next(3) time: 2018-03-23 03:06:54 +0000
收到事件: next(4) time: 2018-03-23 03:06:55 +0000
收到事件: next(5) time: 2018-03-23 03:06:56 +0000
收到事件: next(6) time: 2018-03-23 03:06:57 +0000
............
           

- Combining Operators

startWith

最终序列会在发射原始序列数据前,先发射 startWith 操作符插入的数据。 注:startWith 插入的数据是’先进后出’的

let ob = Observable.of("a", "b", "c", "d")
            .startWith("1")
            .startWith("2")
            .startWith("3", "4", "5")

ob.subscribe(onNext: { print($) })
    .disposed(by: GlobalDisposeBag)
           
输出:





a
b
c
d
           

merge

合并多个序列,当被合并的序列中有任何一个序列发射数据时,该数据则会通过’最终序列’发射出去。最终序列发射数据是没有顺序的,被合并的序列只要发射数据,最终序列就会立即将其发射出去。

let disposeBag = DisposeBag()
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
Observable<String>.merge([subject1,subject2])
    .subscribe(onNext: { print($) })
    .disposed(by: disposeBag)

subject1.onNext("?️")
subject1.onNext("?️")
subject2.onNext("①")
subject2.onNext("②")
subject1.onNext("?")
subject2.onNext("③")
           
输出:
?️
?️
①
②
?
③
           

merge (被合并的序列中有序列异常终止的场景)

只要有一个序列异常终止,则最终序列就会异常终止

let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
Observable.of(subject1, subject2)
    .merge()
    .subscribe {print($)}
    .disposed(by: GlobalDisposeBag)
subject1.onNext("?️")
subject1.onNext("?️")

subject2.onNext("①")
subject2.onNext("②")

subject1.onError(ExampleError)

subject2.onNext("③")
           
输出:
next(?️)
next(?️)
next(①)
next(②)
error(Error Domain=RxExampleError Code= "(null)" UserInfo={msg=这是一个示例错误})
           

merge (merge后的序列正常终止的场景)

注:必须所有序列都正常终止,最终序列才会正常终止。

let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
Observable.of(subject1, subject2)
    .merge()
    .subscribe {print($)}
    .disposed(by: GlobalDisposeBag)
subject1.onNext("?️")
subject1.onNext("?️")

subject2.onNext("①")
subject2.onNext("②")

subject1.onCompleted()

subject2.onNext("③")

subject2.onCompleted()
           
输出:
next(?️)
next(?️)
next(①)
next(②)
next(③)
completed
           

zip

当被’压缩’的任何一个序列发射数据时,zip会寻找所有其他序列发射过的数据中’发射序号’(这个数据在序列中是第几个发射的)与当前发射数据的’发射序号’相同的数据,’压缩’为一个数据在’最终序列’中发射出去。 特别需要注意的是’发射序号’相同这个条件,比如有A、B、C三个序列,当A发射了第一个数据,B也发射了第一个数据,C没有发射,’最终序列’不会发射数据, 之后,当C发射了第一个数据,此时就会将A、B、C序列中的第一个数据’压缩’,然后在’最终序列’中发射。

zip弹珠图

zip 接受一个用与处理’压缩’的函数 RxSwift 的实现中,zip最多可以合并8个序列

let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()

Observable.zip(stringSubject, intSubject) { stringElement, intElement in
    "\(stringElement) \(intElement)"
    }
    .subscribe{ print($) }
    .disposed(by: GlobalDisposeBag)

stringSubject.onNext("?️")
stringSubject.onNext("?️")

intSubject.onNext()

intSubject.onNext()

stringSubject.onNext("?")
intSubject.onNext()
           
输出:
next(?️ )
next(?️ )
next(? )
           

zip(被 zip的序列中有序列异常终止的场景)

xxxxxxxxxxx

let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()

Observable.zip(stringSubject, intSubject) { stringElement, intElement in
    "\(stringElement) \(intElement)"
    }
    .subscribe{ print($) }
    .disposed(by: GlobalDisposeBag)

stringSubject.onNext("a")
stringSubject.onNext("b")
intSubject.onNext()
intSubject.onNext()

stringSubject.onNext("c")
stringSubject.onNext("d")
stringSubject.onError(ExampleError)
intSubject.onNext()
intSubject.onCompleted()
           
输出:
next(a )
next(b )
error(Error Domain=RxExampleError Code= "(null)" UserInfo={msg=这是一个示例错误})
           

zip(zip后的序列正常终止的场景)

压缩的序列中即使某个序列正常终止了,如果其他序列还在发射元素,并且能够匹配到其他已终止序列中相同’发射序号’的元素,那么就仍然可以执行zip操作。

let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()

Observable.zip(stringSubject, intSubject) { stringElement, intElement in
    "\(stringElement) \(intElement)"
    }
    .subscribe{ print($) }
    .disposed(by: GlobalDisposeBag)

stringSubject.onNext("a")
stringSubject.onNext("b")
intSubject.onNext()
intSubject.onNext()

stringSubject.onNext("c")
stringSubject.onNext("d")
stringSubject.onCompleted()
intSubject.onNext()
intSubject.onCompleted()
           
输出:
next(a )
next(b )
next(c )
completed
           

combineLatest

与zip有点类似,不同的是,当被 combine 的任何一个序列发射数据时,combineLatest会把所有序列中的最近一次发射的数据’合并’为一个数据,然后在’最终序列’中发射出去。其正常终止 和 异常终止的行为与 zip 一样。combineLatest弹珠图

let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()

Observable.combineLatest(stringSubject, intSubject) { stringElement, intElement in
    "\(stringElement) \(intElement)"
    }
    .subscribe(onNext: { print($) })
    .disposed(by: GlobalDisposeBag)

stringSubject.onNext("?️")

stringSubject.onNext("?️")
intSubject.onNext()

intSubject.onNext()

stringSubject.onNext("?")
           
输出:
?️ 1
?️ 2
? 2
           

switchLatest

当订阅一个发射序列的序列时,使用 switchLatest 会切换订阅目标至最近被发射的序列。

let subSeq1 = PublishSubject<String>.init()
let subSeq2 = PublishSubject<String>.init()
let  sourceSeq = PublishSubject<PublishSubject<String>>.init()

sourceSeq.switchLatest()
    .subscribe{print($)}
    .disposed(by: GlobalDisposeBag)

sourceSeq.onNext(subSeq1)
subSeq1.onNext("1a")
subSeq1.onNext("1b")

sourceSeq.onNext(subSeq2)
subSeq1.onNext("1c") //这个c不会输出 因为订阅会切换到最新的序列上 (subSeq2)

subSeq2.onNext("2a")
subSeq2.onNext("2b")

sourceSeq.onNext(subSeq1)
subSeq1.onNext("1d") //此时会输出 d 因为subSeq1被发射,订阅切换到 subSeq1 上
           
输出:
next(a)
next(b)
next(a)
next(b)
next(d)
           

switchLatest (原始序列终止其他序列不终止的场景)

只要原始序列正常终止,其他序列不终止, 最终序列并不会终止。

let subSeq1 = PublishSubject<String>.init()
let subSeq2 = PublishSubject<String>.init()
let sourceSeq = PublishSubject<PublishSubject<String>>.init()

sourceSeq.switchLatest()
    .subscribe{print($)}
    .disposed(by: GlobalDisposeBag)

sourceSeq.onNext(subSeq1)
subSeq1.onNext("1a")
subSeq1.onNext("1b")

sourceSeq.onNext(subSeq2)

subSeq2.onNext("2a")
subSeq2.onNext("2b")
sourceSeq.onCompleted()
subSeq2.onNext("2c") //会输出
           
输出:
next(a)
next(b)
next(a)
next(b)
next(c)

           

switchLatest (最终序列正常终止场景)

只有当 原始序列 和 当前所订阅的序列都正常终止时,最终序列才会正常终止。

let subSeq1 = PublishSubject<String>.init()
let subSeq2 = PublishSubject<String>.init()
let sourceSeq = PublishSubject<PublishSubject<String>>.init()

sourceSeq.switchLatest()
    .subscribe{print($)}
    .disposed(by: GlobalDisposeBag)

sourceSeq.onNext(subSeq1)
subSeq1.onNext("1a")
subSeq1.onNext("1b")

sourceSeq.onNext(subSeq2)

subSeq2.onNext("2a")
subSeq2.onNext("2b")
sourceSeq.onCompleted()
subSeq2.onCompleted()
subSeq2.onNext("2c") //不会输出
           
输出:
next(a)
next(b)
next(a)
next(b)
completed

           

switchLatest (最终序列异常终止场景)

当原始序列 或 当前订阅的序列中任何一个异常终止,最终序列会立即异常终止

let subSeq1 = PublishSubject<String>.init()
let subSeq2 = PublishSubject<String>.init()
let sourceSeq = PublishSubject<PublishSubject<String>>.init()

sourceSeq.switchLatest()
    .subscribe{print($)}
    .disposed(by: GlobalDisposeBag)

sourceSeq.onNext(subSeq1)
subSeq1.onNext("1a")
subSeq1.onNext("1b")

sourceSeq.onNext(subSeq2)

subSeq2.onNext("2a")
subSeq2.onNext("2b")
//subSeq2.onError(ExampleError)//订阅序列异常终止的场景,与 sourceSeq异常终止的效果一样。去除注释运行可以查看效果。
sourceSeq.onError(ExampleError)
           
输出:
next(a)
next(b)
next(a)
next(b)
error(Error Domain=RxExampleError Code= "(null)" UserInfo={msg=这是一个示例错误})
           

- Filtering Operators

filter

过滤掉序列中不满足条件的数据

//过滤奇数
Observable.of(,,,,)
    .filter {
        $0 %  == 
    }
    .subscribe{ print($0) }
    .disposed(by: GlobalDisposeBag)
           
输出:
next()
next()
completed
           

ignoreElements

忽略所有元素,序列正常终止。

Observable<Int>.of(,)
            .ignoreElements()
            .subscribe{print($)}
            .disposed(by: GlobalDisposeBag)
           
输出:
completed
           

distinctUntilChanged

过滤掉与上一条数据相同的数据

//过滤连续相同元素
Observable.of(,,,,,,,,,,)
    .distinctUntilChanged()
    .subscribe{ print($) }
    .disposed(by: GlobalDisposeBag)
           
输出:
next()
next()
next()
next()
next()
next()
next()
next()
next()
completed
           

elementAt

发射序列中指定位置的数据

Observable.of("a","b","c","d")
            .elementAt()
            .subscribe{ print($) }
            .disposed(by: GlobalDisposeBag)
           
输出:
next(c)
completed
           

take

发射序列中前N条数据

Observable.of(,,,)
            .take()
            .subscribe{ print($) }
            .disposed(by: GlobalDisposeBag)
           
输出:
next()
next()
completed
           

takeLast

发射序列后N条数据。由于是倒序指定,所以需要序列正常终止后才会开始发射数据。如果序列异常终止,则不会发射任何数据

let seq = Observable<Int>.create { (observer) -> Disposable in
            var i = 
            Timer.scheduledTimer(withTimeInterval: , repeats: true, block: { (timer) in
                if i <= {
                    observer.onNext(i)
                    i = i+
                }else{
                    observer.onCompleted()
                    timer.invalidate()
                    //如果是 异常终止 ,则不会发射数据
                    //observer.onError("Some error")
                }
            }).fire()
            return Disposables.create()
            }.takeLast()
print("订阅 time: ", Date())
seq.subscribe{print("收到事件:",$," time: ",Date())}
    .disposed(by: GlobalDisposeBag)
           
输出:
订阅 time:  2018-03-23 04:16:41 +0000
收到事件: next(2)  time:  2018-03-23 04:16:45 +0000
收到事件: next(3)  time:  2018-03-23 04:16:45 +0000
收到事件: next(4)  time:  2018-03-23 04:16:45 +0000
收到事件: completed  time:  2018-03-23 04:16:45 +0000
           

takeWhile

序列一直发射数据直到某条数据不满足指定条件,然后序列正常终止。

Observable.of(, , , , , )
            .takeWhile { $ <  }
            .subscribe { print($) }
            .disposed(by: GlobalDisposeBag)
           
输出:
next()
next()
next()
completed
           

takeUntil

一直发射数据直到参照序列发射数据时,原始序列正常终止。

let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()

sourceSequence
    .takeUntil(referenceSequence)
    .subscribe { print($0) }
    .disposed(by: GlobalDisposeBag)

sourceSequence.onNext("1")
sourceSequence.onNext("2")
sourceSequence.onNext("3")
print("参照序列发射元素")
referenceSequence.onNext("a")

sourceSequence.onNext("4")
sourceSequence.onNext("5")
sourceSequence.onNext("6")
           
输出:
next()
next()
next()
参照序列发射元素
completed
           

takeUntil(最终序列异常终止场景)

参照序列异常终止则最终序列也异常终止

let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()

sourceSequence
    .takeUntil(referenceSequence)
    .subscribe { print($0) }
    .disposed(by: GlobalDisposeBag)

sourceSequence.onNext("1")
sourceSequence.onNext("2")
sourceSequence.onNext("3")
print("参照序列异常终止")
referenceSequence.onError("Some Error")

sourceSequence.onNext("4")
sourceSequence.onNext("5")
sourceSequence.onNext("6")
           
输出:
next()
next()
next()
参照序列异常终止
error(Error Domain=RxExampleError Code= "(null)" UserInfo={msg=这是一个示例错误})
           

takeUntil(参照序列正常终止场景)

参照序列正常终止,原始序列并不会终止。

let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()

sourceSequence
    .takeUntil(referenceSequence)
    .subscribe { print($0) }
    .disposed(by: GlobalDisposeBag)

sourceSequence.onNext("1")
sourceSequence.onNext("2")
sourceSequence.onNext("3")
referenceSequence.onNext("a")
print("参照序列正常终止")
referenceSequence.onCompleted()

sourceSequence.onNext("4")
sourceSequence.onNext("5")
sourceSequence.onNext("6")
           
输出:
next()
next()
next()
completed
参照序列正常终止
           

skip

序列发射时跳过前N条数据

Observable.of(,,,,)
            .skip()
            .subscribe{ print($) }
            .disposed(by: GlobalDisposeBag)
           
输出:
next()
next()
next()
completed
           

skipDuration

序列在N秒后开始发射数据,

Observable<Int>.create { (observer) -> Disposable in
            var i = 
            Timer.scheduledTimer(withTimeInterval: , repeats: true, block: { (timer) in
                print("原始序列发射数据: ",i,"time: ",Date())
                if i <= {
                    observer.onNext(i)
                }else{
                    observer.onCompleted()
                    timer.invalidate()
                }
                i = i+
            })
            return Disposables.create()
            }
            .skip(, scheduler: MainScheduler.instance)
            .subscribe{print("接收到数据: ",$," time: ",Date())}
            .disposed(by: GlobalDisposeBag)
           
输出:
原始序列发射数据:  1 time:  2018-03-23 04:24:45 +0000
原始序列发射数据:  2 time:  2018-03-23 04:24:46 +0000
原始序列发射数据:  3 time:  2018-03-23 04:24:47 +0000
接收到数据:  next(3)  time:  2018-03-23 04:24:47 +0000
原始序列发射数据:  4 time:  2018-03-23 04:24:48 +0000
接收到数据:  next(4)  time:  2018-03-23 04:24:48 +0000
原始序列发射数据:  5 time:  2018-03-23 04:24:49 +0000
接收到数据:  next(5)  time:  2018-03-23 04:24:49 +0000
原始序列发射数据:  6 time:  2018-03-23 04:24:50 +0000
接收到数据:  completed  time:  2018-03-23 04:24:50 +0000
           

skipWhile

序列会跳过不满条件的数据,然后开始发射,开始发射后则不会跳过不满足条件的数据。

Observable.of(, , , , , , )
            .skipWhile { $ <  }
            .subscribe{ print($) }
            .disposed(by: GlobalDisposeBag)
           
输出:
next()
next()
next()
next()
completed
           

skipUntil

直到参照序列发射数据,最终序列才开始发射

let sourceSequence = PublishSubject<Int>()
let referenceSequence = PublishSubject<String>()

sourceSequence
    .skipUntil(referenceSequence)
    .subscribe{ print($0) }
    .disposed(by: GlobalDisposeBag)

sourceSequence.onNext()
sourceSequence.onNext()
sourceSequence.onNext()
print("参照序列发射元素")
referenceSequence.onNext("?")

sourceSequence.onNext()
sourceSequence.onNext()
sourceSequence.onNext()
           
输出:
参照序列发射元素
next()
next()
next()
           

skipUntil (最终序列异常终止场景)

参照序列异常终止,则最终序列也会异常终止,不会发射任何数据。

let sourceSequence = PublishSubject<Int>()
let referenceSequence = PublishSubject<String>()

sourceSequence
    .skipUntil(referenceSequence)
    .subscribe{ print($0) }
    .disposed(by: GlobalDisposeBag)

sourceSequence.onNext()
sourceSequence.onNext()
sourceSequence.onNext()
print("参照序列异常终止")
referenceSequence.onError(ExampleError)

sourceSequence.onNext()
sourceSequence.onNext()
sourceSequence.onNext()
           
输出:
参照序列异常终止
error(Error Domain=RxExampleError Code= "(null)" UserInfo={msg=这是一个示例错误})
           

skipUntil (参照序列正常终止场景)

参照序列正常终止,则最终序列不会发射任何数据也不会终止

let sourceSequence = PublishSubject<Int>()
let referenceSequence = PublishSubject<String>()

sourceSequence
    .skipUntil(referenceSequence)
    .subscribe{ print($0) }
    .disposed(by: GlobalDisposeBag)

sourceSequence.onNext()
sourceSequence.onNext()
sourceSequence.onNext()
print("参照序列正常终止")
referenceSequence.onCompleted()

sourceSequence.onNext()
sourceSequence.onNext()
sourceSequence.onNext()
           
输出:
参照序列正常终止
           

sample

该操作符接收一个用于‘采样’的序列,采样序列会不定时采样(RxSwfit中的实现是当采样序列发射数据时,则对原始序列进行采样),采得的数据会通过最终序列发射。 原始序列或采样序列终止,则最终序列终止。

let sourceSeq = Observable<Int>.create { (observer) -> Disposable in
    var i = 
    Timer.scheduledTimer(withTimeInterval: , repeats: true, block: { (timer) in
        if i <= {
            print("原始序列发射数据:\(i) [第 \(i) 秒]")
            observer.onNext(i)
        }else{
            observer.onCompleted()
            timer.invalidate()
        }
        i = i+
    })
    return Disposables.create()
}

let samplerSeq = Observable<String>.create { (observer) -> Disposable in
    var i = 
    Timer.scheduledTimer(withTimeInterval: , repeats: true, block: { (timer) in
        if i <= {
            let item = String(i)+"SamplerItem"
            print("采样序列发射数据:\(item)")
            observer.onNext(item)
        }else{
            observer.onCompleted()
            timer.invalidate()
        }
        i = i+
    })
    return Disposables.create()
}
sourceSeq.sample(samplerSeq).subscribe{print("观察者接收:",$)}.disposed(by: GlobalDisposeBag)
           
输出:
原始序列发射数据:1 [第 1 秒]
原始序列发射数据:2 [第 2 秒]
原始序列发射数据:3 [第 3 秒]
采样序列发射数据:1SamplerItem
订阅者接收到数据: next(3)
原始序列发射数据:4 [第 4 秒]
原始序列发射数据:5 [第 5 秒]
采样序列发射数据:2SamplerItem
订阅者接收到数据: next(5)
订阅者接收到数据: completed
采样序列发射数据:3SamplerItem
           

throttle

‘节流’:当发生函数调用时,执行函数并开始计时,指定时长内的再次调用则会被舍弃,直至计时结束。类比到序列中:原始序列发射数据时,如果该次发射在上一次发射的计时周期中,则最终序列不会发射该元素,如果不在,则会发射,并同时开启新计时周期。 实际案例:赞->取消赞->赞->取消赞… ,为了防止用户的疯狂点击给服务器造成不必要的压力,可以设置一个点击事件最短执行间隔,比如1s ,那么用户在1s内的多次点击只会真正触发一次点击事件。

RxSwift中该操作符有一个latest参数 :指计时周期结束后,最终序列是否会将原始序列最近一次发射的数据发射出去(如果最近一次发射的数据与触发计时周期的那个是同一个,则不会发射),默认是 true 。

下面demo演示的是 latest 为 false 的情况。

Observable<Int>.create { (observer) -> Disposable in
            var i = 
            Timer.scheduledTimer(withTimeInterval: , repeats: true, block: { (timer) in
                if i == {
                    print("原始序列终止: [第 \(i) 秒])")
                    observer.onCompleted()
                    timer.invalidate()
                }else{
                    print("原始序列发射数据:\(i) [第 \(i) 秒])")
                    observer.onNext(i)
                }
                i = i+
            }).fire()
            return Disposables.create()
            }.throttle(, latest: false,scheduler: MainScheduler.instance)
            .subscribe({print("观察者接收",$)})
            .disposed(by: GlobalDisposeBag)
           
输出:
原始序列发射数据:0 [第 0 秒])
观察者接收 next(0)
原始序列发射数据:1 [第 1 秒])
原始序列发射数据:2 [第 2 秒])
原始序列发射数据:3 [第 3 秒])
观察者接收 next(3)
原始序列发射数据:4 [第 4 秒])
原始序列发射数据:5 [第 5 秒])
原始序列发射数据:6 [第 6 秒])
观察者接收 next(6)
原始序列发射数据:7 [第 7 秒])
原始序列发射数据:8 [第 8 秒])
原始序列发射数据:9 [第 9 秒])
观察者接收 next(9)
原始序列终止: [第 10 秒])
观察者接收 completed
           

throttle(latest 为 true的情况)

注:最终序列一旦发射元素就会开始计时,所以last element 被发射的时候计时就开始了,而这个元素就会被认为是计时周期内的第一个元素。

Observable<Int>.create { (observer) -> Disposable in
            var i = 
            Timer.scheduledTimer(withTimeInterval: , repeats: true, block: { (timer) in
                if i == {
                    print("原始序列终止: [第 \(i) 秒])")
                    observer.onCompleted()
                    timer.invalidate()
                }else{
                    print("原始序列发射数据:\(i) [第 \(i) 秒])")
                    observer.onNext(i)
                }
                i = i+
            }).fire()
            return Disposables.create()
            }.throttle(, scheduler: MainScheduler.instance)
            .subscribe({print("订阅者接收",$)})
            .disposed(by: GlobalDisposeBag)
           
输出:
原始序列发射数据:0 [第 0 秒])
订阅者接收 next(0)
原始序列发射数据:1 [第 1 秒])
原始序列发射数据:2 [第 2 秒])
订阅者接收 next(2)
原始序列发射数据:3 [第 3 秒])
原始序列发射数据:4 [第 4 秒])
原始序列发射数据:5 [第 5 秒])
订阅者接收 next(5)
原始序列发射数据:6 [第 6 秒])
原始序列发射数据:7 [第 7 秒])
订阅者接收 next(7)
原始序列发射数据:8 [第 8 秒])
原始序列发射数据:9 [第 9 秒])
原始序列终止: [第 10 秒])
订阅者接收 next(9)
订阅者接收 completed
           

debounce

‘防抖’:每次调用函数时会开始计时,当时指定时长内未有再次调用,则会执行函数,如果指定时长内有新的调用,则重新计时。简单点说就是一定时间的内所有函数调用只有最后一次生效。类比到序列中:每次调用 onNext 时会开始计时,当时指定时长内未有再次调用,则内部会发射该元素。 实际场景:对输入框的用户输入进行关键词联想,如果用户输入的过快则不请求联想接口,当用户停止输入或输入变慢时发起请求

let subject = PublishSubject<Int>()
let dateFormatString = "HH:mm:ss.ms"
let debounceSeq = subject.debounce(, scheduler: MainScheduler.instance)
debounceSeq.subscribe({print("观察者 received:",$,"at time:",curDateString(dateFormatString))})
    .disposed(by: GlobalDisposeBag)
print("call onNext 1 at time:",curDateString(dateFormatString))
subject.onNext()
print("call onNext 2 at time:",curDateString(dateFormatString))
subject.onNext()
print("call onNext 3 at time:",curDateString(dateFormatString))
subject.onNext()
delay() {
    print("call onNext 4 at time:",curDateString(dateFormatString))
    subject.onNext()
}
           
输出:
call onNext  at time: ::
call onNext  at time: ::
call onNext  at time: ::
观察者 received: next() at time: ::
call onNext  at time: ::
观察者 received: next() at time: ::
           

- Transforming Operators

map

将原始序列发射的数据进行一次转换,然后在最终序列中发射。

Observable.of(, , )
    .map { String($0)+"a" }
    .subscribe({ print($0) })
    .disposed(by: GlobalDisposeBag)
           
输出:
next(a)
next(a)
next(a)
completed
           

flatMap

将原始序列发射的数据变换成序列,最终序列上发射的是这些变换序列发射的数据。 从降维角度: 原始序列发射的每个数据被映射为一个序列,但是最终序列发射的是映射的那些序列所发射的数据。 从异步操作的角度:item -> Observalbe 表示一个异步操作的结果,触发了另外一个异步操作。

代码部分详见Demo项目中的FlatMapDemoVC
           

flatMapLatest

将原始序列发射的数据变换成序列,最终序列上发射的是最近一次变换产生的序列发射的元素。

代码部分详见Demo项目中的FlatMapLatestDemoVC
           

scan

该操作符拥有’累积计算’效果。 原始序列发射数据时,会使用 scan 操作符提供的函数进行计算,返回一个新数据,然后在最终序列上发射出去,每次 scan计算的结果会参与下一次计算。

Observable.of(, , )
            .scan() { aggregateValue, newValue in
                print(aggregateValue, newValue)
                return aggregateValue + newValue
            }
            .subscribe(onNext: { print("观察者接收到:",$) })
            .disposed(by: GlobalDisposeBag)
           
输出:
1 10
观察者接收到: 11
11 100
观察者接收到: 111
111 1000
观察者接收到: 1111
           

buffer

原始序列发射数据时,最终序列会将数据缓存到一个容量为 N,有效时长为 M 的缓冲区内。当缓冲区被填满或者失效时,缓冲区内的所有数据会被最终序列打包进一个数组,然后整个数组做为一条数据被最终序列发射出去。然后刷新缓冲区(清理数据、状态置为有效),如此循环往复。 比如等电梯,装满人或者5秒钟没人进来,就关门。

subject.buffer(timeSpan: , count: , scheduler: MainScheduler.instance)
    .subscribe({print($)}).disposed(by: GlobalDisposeBag)
subject.onNext()
subject.onNext()
subject.onNext()
subject.onNext()
//、、会先输出,因为达到了 count 条件
//会过一秒输出,因为达到 timeSpan 条件 (,,输出后,缓冲区重置)
           
输出:
next([, , ])
next([])
next([])
next([])
................
           

buffer (最终序列异常终止场景)

原始序列异常终止时,缓冲区的数据不会被输出,最终序列直接异常终止。

let subject = PublishSubject<Int>()
subject.buffer(timeSpan: , count: , scheduler: MainScheduler.instance)
    .subscribe({print($)}).disposed(by: GlobalDisposeBag)
subject.onNext()
subject.onNext()
subject.onNext()
subject.onNext()
subject.onError(ExampleError)
           
输出:
next([, , ])
error(Error Domain=RxExampleError Code= "(null)" UserInfo={msg=这是一个示例错误})
           

buffer (最终序列正常终止场景)

let subject = PublishSubject<Int>()
subject.buffer(timeSpan: , count: , scheduler: MainScheduler.instance)
    .subscribe({print($)}).disposed(by: GlobalDisposeBag)
subject.onNext()
subject.onNext()
subject.onNext()
subject.onNext()
subject.onCompleted()
           
输出:
next([, , ])
next([])
completed
           

window

原始序列发射数据时,最终序列会把数据发送到一个容量为 N,有效时长为 M 的窗口内,同时最终序列会把窗口发射出去,观察者会监听窗口,当有数据被发送到窗口,观察者则会接收到数据。如果窗口被填满或窗口失效,则会创建一个新的窗口(窗口创建时就会被发射出去)。简单点说,window 返回的序列发射的是一个序列,该序列就是用于获取窗口数据的。

let subject = PublishSubject<Int>()
subject.window(timeSpan: , count: , scheduler: MainScheduler.asyncInstance)
    .flatMap({ (seq) -> Observable<Int> in
        print("接收到的最终序列的发射的数据(窗口 observable)",seq)
        return seq
    })
    .subscribe({print($)}).disposed(by: GlobalDisposeBag)

subject.onNext()
subject.onNext()
subject.onNext()
subject.onNext()
           
输出:
接收到的最终序列的发射的数据(窗口 observable) RxSwift.AddRef<Swift.Int>
next()
next()
next()
接收到的最终序列的发射的数据(窗口 observable) RxSwift.AddRef<Swift.Int>
next()
接收到的最终序列的发射的数据(窗口 observable) RxSwift.AddRef<Swift.Int>
接收到的最终序列的发射的数据(窗口 observable) RxSwift.AddRef<Swift.Int>
接收到的最终序列的发射的数据(窗口 observable) RxSwift.AddRef<Swift.Int>
接收到的最终序列的发射的数据(窗口 observable) RxSwift.AddRef<Swift.Int>
.........
           

groupBy

原始序列发射的数据,会根据指定的条件被分配到相应的’分组序列’中,’分组序列’会做为最终序列的数据被发射出去,一个’分组序列’只会被发射一次,当有新的分组产生时,最终序列才会发射’分组序列’。观察者可以通过监听不同的分组序列以得到对应分组的数据。

let groupSeq = Observable<Int>.of(-,-,,,,,,,,,,,,,).groupBy { (score) -> String in
            if score <  && score >= {
                return "不及格"
            }else if score >=  && score < {
                return "良好"
            }else if score >=  && score <= {
                return "优秀"
            }else{
                return "invalid"
            }
            }.publish()
/* 筛选出分组序列 */
let bujigeSeq = groupSeq.filter({ (item: GroupedObservable<String,Int>) -> Bool in
    if item.key.contains("不及格"){
        return true
    }
    return false
}).flatMap { (groupedSeq: GroupedObservable<String,Int>) -> Observable<Int> in
    return groupedSeq.asObservable()
    }
bujigeSeq.subscribe{print("不及格-观察者",$)}.disposed(by: GlobalDisposeBag)

let lianghaoSeq = groupSeq.filter({ (item: GroupedObservable<String,Int>) -> Bool in
    if item.key.contains("良好"){
        return true
    }
    return false
}).flatMap { (groupedSeq: GroupedObservable<String,Int>) -> Observable<Int> in
    return groupedSeq.asObservable()
    }
lianghaoSeq.subscribe{print("良好-观察者",$)}.disposed(by: GlobalDisposeBag)

let youxiuSeq = groupSeq.filter({ (item: GroupedObservable<String,Int>) -> Bool in
    if item.key.contains("优秀"){
        return true
    }
    return false
}).flatMap { (groupedSeq: GroupedObservable<String,Int>) -> Observable<Int> in
    return groupedSeq.asObservable()
    }
youxiuSeq.subscribe{print("优秀-观察者",$)}.disposed(by: GlobalDisposeBag)

groupSeq.connect().disposed(by: GlobalDisposeBag)
           
输出:
不及格-观察者 next()
不及格-观察者 next()
不及格-观察者 next()
不及格-观察者 next()
不及格-观察者 next()
良好-观察者 next()
良好-观察者 next()
良好-观察者 next()
优秀-观察者 next()
优秀-观察者 next()
优秀-观察者 next()
不及格-观察者 completed
良好-观察者 completed
优秀-观察者 completed
           

- Convert Operator

toArray

将原始序列中的数据打包成数组,由最终序列发射出去。需要原始序列正常终止后,才会打包发射。如果原始序列异常终止,则最终序列也会异常终止。

let seq = Observable<Int>.create { (observer) -> Disposable in
    var i = 
    Timer.scheduledTimer(withTimeInterval: , repeats: true, block: { (timer) in
        if i <= {
            observer.onNext(i)
            i = i+
        }else{
            observer.onCompleted()
            //如果是 error ,则不会发射元素,只会发射 error 事件。
        }
    }).fire()
    return Disposables.create()
    }.toArray()
print("开始订阅, time:", Date())
seq.subscribe{print("接收到数据: ",$," time: ",Date())}.disposed(by: GlobalDisposeBag)
           
输出:
开始订阅, time: 2018-03-23 04:49:29 +0000
接收到数据:  next([1, 2, 3, 4])  time:  2018-03-23 04:49:33 +0000
接收到数据:  completed  time:  2018-03-23 04:49:33 +0000
           

- Mathematical and Aggregate Operators

reduce

与数组的 reduce 类似,所有元素参与’累加’运算,将最终结果发射出去。

Observable.of(, , ).reduce("a", accumulator: { (aggregate, ele) -> String in
    return aggregate + String(ele)
})
.subscribe{ print($) }
.disposed(by: disposeBag)
           
输出:
next(a123)
completed
           

concat

将多个序列合并,最终序列按顺序发射各个序列发射的数据。只有前面的序列发射结束了,后面序列发射的数据才会被最终序列发射。注意的问题:当后面的序列是 ‘hot’序列时,此时,在前面序列发射完成前,后面序列所发射的那些数据将不会被 最终序列 发射。 异常终止行为:当任何一个序列异常终止,则’最终序列’会异常终止。(即使是还没轮到发射的那些序列的异常终止) 正常终止行为: 当所有序列正常终止,则’最终序列’正常终止。

let disposeBag = DisposeBag()
let subject1 = PublishSubject<String>.init()
let subject2 = PublishSubject<String>.init()
Observable<String>.concat(subject1,subject2).subscribe{print($)}.disposed(by: disposeBag)

subject1.onNext("1")
subject1.onNext("2")

subject2.onNext("这个元素将不会输出")

subject1.onCompleted()

subject2.onNext("a")
subject2.onNext("b")
subject2.onCompleted()
           
输出:
next()
next()
next(a)
next(b)
completed
           

concat (被 concat 中的序列中有异常终止的场景)

当任何一个序列异常终止,则’最终序列’会异常终止。(即使是还没轮到发射的那些序列的异常终止)

let disposeBag = DisposeBag()
let subject1 = PublishSubject<String>.init()
let subject2 = PublishSubject<String>.init()
Observable<String>.concat(subject1,subject2).subscribe{print($)}.disposed(by: disposeBag)

subject1.onNext("1")
subject1.onNext("2")
//即使此时最终序列还不接收 subject2 的发射数据,但是它的异常终止也会让最终序列异常终止。
subject2.onError(ExampleError)

subject1.onCompleted()

subject2.onNext("a")
subject2.onNext("b")
subject2.onCompleted()
           
输出:
next()
next()
error(Error Domain=RxExampleError Code= "(null)" UserInfo={msg=这是一个示例错误})
           

- Connectable Operators

‘Connectable序列’ 是一种特别的序列,它可以更精确的动态的控制订阅行文。Connectable 序列并不会在观察者订阅时就开始发射数据,而是需要主动调用 connect() 函数以开始发射数据。并且订阅行为不会触发序列的被订阅逻辑(shares a single subscription to the underlying sequence.)

publish

将普通序列转换成 Connectable 序列

//此时订阅后,并不会让序列发射元素
let seq = Observable<Int>.of(,,).publish()
seq.subscribe{ print($) }.disposed(by: DisposeBag())
           
没有输出
           

connect

使序列开始发射数据

let seq = Observable<Int>.interval(, scheduler: MainScheduler.instance).publish()
seq.subscribe{ print("观察者1:",$) }.disposed(by: GlobalDisposeBag)
seq.connect().disposed(by: GlobalDisposeBag)
//后续的订阅会 shares single subscription
delay() {
    seq.subscribe{ print("观察者2:",$) }.disposed(by: GlobalDisposeBag)
}
//后续的订阅会 shares single subscription
delay() {
    seq.subscribe{ print("观察者3:",$) }.disposed(by: GlobalDisposeBag)
}
           
输出:
观察者: next()
观察者: next()
观察者: next()
观察者: next()
观察者: next()
观察者: next()
观察者: next()
观察者: next()
观察者: next()
观察者: next()
观察者: next()
观察者: next()
观察者: next()
观察者: next()
观察者: next()
观察者: next()
观察者: next()
观察者: next()
观察者: next()
观察者: next()
观察者: next()
观察者: next()
....................
           

replay

观察者订阅时,序列会将先前发射过的最近N个数据先发射给观察者

let seq = Observable<Int>.interval(, scheduler: MainScheduler.instance).replay()
seq.subscribe{ print("订阅者1 接收到数据:",$) }.disposed(by: GlobalDisposeBag)
seq.connect().disposed(by: GlobalDisposeBag)
delay() {
    seq.subscribe{ print("订阅者2 接收到数据:",$) }.disposed(by: GlobalDisposeBag)
}
//会接收到订阅前发射的最近个元素
delay() {
    seq.subscribe{ print("订阅者3 接收到数据",$) }.disposed(by: GlobalDisposeBag)
}
           
输出:
订阅者 接收到数据: next()
订阅者 接收到数据: next()
订阅者 接收到数据: next()
订阅者 接收到数据: next()
订阅者 接收到数据: next()
订阅者 接收到数据: next()
订阅者 接收到数据: next()
订阅者 接收到数据: next()
订阅者 接收到数据: next()
订阅者 接收到数据 next()
订阅者 接收到数据 next()
订阅者 接收到数据: next()
订阅者 接收到数据: next()
订阅者 接收到数据 next()
订阅者 接收到数据: next()
订阅者 接收到数据: next()
订阅者 接收到数据 next()
订阅者 接收到数据: next()
订阅者 接收到数据: next()
订阅者 接收到数据 next()
.............................
           

replayAll

观察者订阅时,序列会将先前发射过的数据全部先发射给观察者

let seq = Observable<Int>.interval(, scheduler: MainScheduler.instance).replayAll()
seq.subscribe{ print("订阅1:",$) }.disposed(by: GlobalDisposeBag)
seq.connect().disposed(by: GlobalDisposeBag)
delay() {
    seq.subscribe{ print("订阅2:",$) }.disposed(by: GlobalDisposeBag)
}
delay() {
    seq.subscribe{ print("订阅3:",$) }.disposed(by: GlobalDisposeBag)
}
           
输出:
订阅: next()
订阅: next()
订阅: next()
订阅: next()
订阅: next()
订阅: next()
订阅: next()
订阅: next()
订阅: next()
订阅: next()
订阅: next()
订阅: next()
订阅: next()
订阅: next()
订阅: next()
订阅: next()
订阅: next()
订阅: next()
..............
           

multicast

数据会通过指定的Subject发射给订阅者

let subject = PublishSubject<Int>()
_ = subject
    .subscribe{ print("Subject 观察者: \($0)") }

let intSequence = Observable<Int>
    .interval(, scheduler: MainScheduler.instance)
    .multicast(subject)

intSequence.subscribe{ print("\tintSequence观察者 1:, Event: \($0)") }.disposed(by: GlobalDisposeBag)
intSequence.subscribe{ print("\tintSequence观察者 2:, Event: \($0)") }.disposed(by: GlobalDisposeBag)
intSequence.subscribe{ print("\tintSequence观察者 3:, Event: \($0)") }.disposed(by: GlobalDisposeBag)

//秒后开始发射数据
delay() { intSequence.connect().disposed(by: GlobalDisposeBag)}

//subject, 秒后终止
delay() {
    subject.onCompleted()
}
           
输出:
Subject 观察者: next()
    intSequence观察者 :, Event: next()
    intSequence观察者 :, Event: next()
    intSequence观察者 :, Event: next()
Subject 观察者: next()
    intSequence观察者 :, Event: next()
    intSequence观察者 :, Event: next()
    intSequence观察者 :, Event: next()
Subject 观察者: completed
    intSequence观察者 :, Event: completed
    intSequence观察者 :, Event: completed
    intSequence观察者 :, Event: completed
           

- Error Handling Operators

catchErrorJustReturn

当捕获到 error 事件后,序列会用预先设置好的数据发射出去,然后正常终止。

let sequenceThatFails = PublishSubject<String>()

sequenceThatFails
    .catchErrorJustReturn("Catch it")
    .subscribe { print($) }
    .disposed(by: GlobalDisposeBag)

sequenceThatFails.onNext("1")
sequenceThatFails.onNext("2")
sequenceThatFails.onError(ExampleError)
sequenceThatFails.onNext("3")
sequenceThatFails.onNext("4")
           
输出:
next()
next()
next(Catch it)
completed
           

catchError

当捕获到 error 事件后,切换到另外一个序列发射数据

let sequenceThatFails = PublishSubject<String>()
let recoverySequence = PublishSubject<String>()

sequenceThatFails
    .catchError {_ in
        return recoverySequence
    }
    .subscribe { print($) }
    .disposed(by: GlobalDisposeBag)

sequenceThatFails.onNext("1")
sequenceThatFails.onNext("2")
sequenceThatFails.onNext("3")
sequenceThatFails.onNext("4")
sequenceThatFails.onError("Some Error")
recoverySequence.onNext("r3")
recoverySequence.onNext("r4")
           
输出:
next()
next()
next()
next()
next(r3)
next(r4)
           

retry

当序列发出 error 事件后,会重新订阅序列,以让该序列重新发射数据。

var count = 
let sequenceThatErrors = Observable<String>.create { observer in
    observer.onNext("1")
    observer.onNext("2")
    observer.onNext("3")

    if count ==  {
        observer.onError(ExampleError)
        count += 
    }

    observer.onNext("4")
    observer.onNext("5")
    observer.onNext("6")
    observer.onCompleted()

    return Disposables.create()
}
sequenceThatErrors
    .retry()
    .subscribe { print($) }
    .disposed(by: GlobalDisposeBag)
           
输出:
next()
next()
next()
next()
next()
next()
next()
next()
next()
completed
           

retryMaxAttemptCount

与 retry 行为一样,只是多了一个最多尝试次数的限制

let sequenceThatErrors = Observable<String>.create { observer in
    observer.onNext("1")
    observer.onNext("2")
    observer.onNext("3")

    observer.onError(ExampleError)

    observer.onNext("4")
    observer.onNext("5")
    observer.onNext("6")
    observer.onCompleted()
    return Disposables.create()
}
sequenceThatErrors
    .retry()
    .subscribe { print($) }
    .disposed(by: GlobalDisposeBag)
           
输出:
next()
next()
next()
next()
next()
next()
next()
next()
next()
error(Error Domain=RxExampleError Code= "(null)" UserInfo={msg=这是一个示例错误})
           

retryWhen

序列失败时,等待’通知序列’发射元素,一旦’通知序列’发射数据,原始序列则会进行retry操作,如果通知序列正常或异常终止,则原始序列同样正常或异常终止。

let notifyer = PublishSubject<String>()
var count = 
let sequenceThatErrors = Observable<String>.create { observer in
    observer.onNext("1")
    observer.onNext("2")
    observer.onNext("3")
    if count == {
        observer.onError("Some Error")
        count = count + 
    }
    observer.onNext("4")
    observer.onNext("5")
    observer.onNext("6")
    observer.onCompleted()
    return Disposables.create()
}
sequenceThatErrors
    .retryWhen({ (errorSeq:Observable<Error>) -> Observable<String> in
        return notifyer
    })
    .subscribe { print("接收到数据: ",$," time: ",Date()) }
    .disposed(by: GlobalDisposeBag)

delay() {
    print("通知序列发射元素, time: ", Date())
    notifyer.onNext("a")
    //notifyer.onCompleted() //以 Completed 事件结束
    //notifyer.onError("Some Error") // 以 error 事件结束
           
输出:
接收到数据:  next(1)  time:  2018-03-23 05:10:18 +0000
接收到数据:  next(2)  time:  2018-03-23 05:10:18 +0000
接收到数据:  next(3)  time:  2018-03-23 05:10:18 +0000
通知序列发射元素, time:  2018-03-23 05:10:20 +0000
接收到数据:  next(1)  time:  2018-03-23 05:10:20 +0000
接收到数据:  next(2)  time:  2018-03-23 05:10:20 +0000
接收到数据:  next(3)  time:  2018-03-23 05:10:20 +0000
接收到数据:  next(4)  time:  2018-03-23 05:10:20 +0000
接收到数据:  next(5)  time:  2018-03-23 05:10:20 +0000
接收到数据:  next(6)  time:  2018-03-23 05:10:20 +0000
接收到数据:  completed  time:  2018-03-23 05:10:20 +0000
           

- Conditional and Boolean Operators

amb

最终序列只会发射被 amb 组合的那些序列中最先发射数据的序列的数据。

//设置序列的订阅逻辑该如何调度
let seq1 = PublishSubject<String>()
let seq2 = PublishSubject<String>()
let seq3 = PublishSubject<String>()
let ambSeq = seq1.amb(seq2).amb(seq3)
ambSeq.subscribe{print($)}.disposed(by: GlobalDisposeBag)

//序列 先发射,则 ambSeq 只会发射序列的数据
seq2.onNext("seq2_first")

seq1.onNext("seq1_a")
seq1.onNext("seq1_b")

seq2.onNext("seq2_a")
seq2.onNext("seq2_b")

seq3.onNext("seq3_a")
seq3.onNext("seq3_b")
           
输出:
next(seq2_first)
next(seq2_a)
next(seq2_b)
           

ifEmpty

ifEmpty 用来处理序列未发射数据就终止的场景 原始序列发射了数据,则 ifEmpty 不会生效。 原始序列未发射数据:

  • 如果原始序列正常终止,最终序列就会发射一个 ifEmpty 预先设置好的数据
  • 如果原始序列异常终止,ifEmpty 预先设置的数据不会被发射,同时最终序列异常终止。

下面是 原始序列发射了数据,ifEmpty不生效场景

let seq1 = PublishSubject<String>()
seq1.ifEmpty(default: "placeholder")
    .subscribe{print($)}
    .disposed(by: GlobalDisposeBag)
seq1.onNext("item1")
seq1.onCompleted()
           
输出:
next(item1)
completed
           

ifEmpty (原始序列正常终止场景)

let seq1 = PublishSubject<String>()
seq1.ifEmpty(default: "placeholder")
    .subscribe{print($)}
    .disposed(by: GlobalDisposeBag)
seq1.onCompleted()
           
输出:
next(placeholder)
completed
           

ifEmpty (原始序列异常终止场景)

let seq1 = PublishSubject<String>()
seq1.ifEmpty(default: "placeholder")
    .subscribe{print($)}
    .disposed(by: GlobalDisposeBag)
seq1.onError(ExampleError)
           
输出:
error(Error Domain=RxExampleError Code= "(null)" UserInfo={msg=这是一个示例错误})
           

ifEmptySwitchTo

如果原始序列未发射数据就终止,那么观察者就切换订阅至指定的序列上

let sourceSeq = PublishSubject<String>()
let standbySeq = PublishSubject<String>()

sourceSeq.ifEmpty(switchTo: standbySeq).subscribe{print($0)}.disposed(by: GlobalDisposeBag)

sourceSeq.onCompleted()
standbySeq.onNext("备用队列 item1")
standbySeq.onCompleted()
           
输出:
next(备用队列 item1)
completed