天天看点

RxSwift特征序列

一、概述

二、Single

三、Completable

四、Maybe

五、Driver

六、Signal

七、ControlEvent

RxSwift特征序列

一、概述

任何序列都可以用

Observable

描述,创建序列 -> 订阅序列 -> 信号发送 -> 信号接收。

Observable<Any>.create { (observer) -> Disposable in
    observer.onNext("信号1")
    return Disposables.create()
}.subscribe(onNext: { (val) in
    print("信号接收区:\(val)")
}).disposed(by: disposeBag)
           

Observable

是通用序列的描述符,调用

.onNext

.onError

onCompleted

来发送信号,通用性强,但针对特殊需求可能会觉得繁琐,因此

RxSwift

还提供了一组特征序列,是

Observable

序列的变种,它能够帮助我们更准确的描述序列。即

Single

Completable

Maybe

Driver

Signal

ControlEvent

二、Single

1、定义

单元素序列,信号只发送一次,响应信号或错误信号。

Single<Any>.create { (single) -> Disposable in
            single(.success("假装我是一个正儿八经的数据"))
            //single(.error(NSError.init(domain: "网络出现错误", code: 101, userInfo:["name":"hibo"])))
            return Disposables.create()
        }.subscribe(onSuccess: { (val) in
            print("Single:\(val)")
        }) { (error) in
            print("SingleError:\(error)")
        }.disposed(by: disposeBag)
           
  • sinngle(.success(data))

    ->

    onSuccess

    发送响应元素到成功观察者
  • sinngle(.error(error))

    ->

    error

    发送错误元素到错误观察者

响应元素和错误元素分开处理,此时我们可以联想到应用中的网络请求,成功数据用来渲染,错误数则据弹出提示框。

2、源码探索

2.1、Single定义

/// Sequence containing exactly 1 element
public enum SingleTrait { }
/// Represents a push style sequence containing 1 element.
public typealias Single<Element> = PrimitiveSequence<SingleTrait, Element>

public enum SingleEvent<Element> {
    /// One and only sequence element is produced. (underlying observable sequence emits: `.next(Element)`, `.completed`)
    case success(Element)
    
    /// Sequence terminated with an error. (underlying observable sequence emits: `.error(Error)`)
    case error(Swift.Error)
}
           

定位到

Single.swift

文件,首先能看到

Single

PrimitiveSequence

结构体类型的别名,

SingleEvent

是事件枚举,有

success

error

两个成员变量。

2.2、

create

创建序列。代码如下(此处代码标记为1️⃣):

extension PrimitiveSequenceType where TraitType == SingleTrait {
    public typealias SingleObserver = (SingleEvent<ElementType>) -> Void
          //代码省略若干行
    public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single<ElementType> {
        let source = Observable<ElementType>.create { observer in
            return subscribe { event in
                switch event {
                case .success(let element):
                    observer.on(.next(element))
                    observer.on(.completed)
                case .error(let error):
                    observer.on(.error(error))
                }
            }
        }
        
        return PrimitiveSequence(raw: source)
    }
}
           

首先看参数是一个带

Disposable

类型返回值的闭包,交由外部(业务层)实现,内部调用向外传入一个

SingleObserver

闭包,以上写法不太好理解,我们可以换一种写法:

public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single<ElementType> {
    let source = Observable<ElementType>.create { observer in
        // 1、内部实现一个闭包,用来接收外界传入的SingleEvent信号,接着做进一步的信号发送
        let block = { (event:SingleEvent<ElementType>) -> Void in
            switch event {
            case .success(let element):
                observer.on(.next(element))
                observer.on(.completed)
            case .error(let error):
                observer.on(.error(error))
            }
        }
        // 2、调用外部实现的闭包方法,向外部发送内部实现的闭包方法做连接作用
        let disposable = subscribe(block)//3、返回值Disposable对象 
        return disposable
    }
    return PrimitiveSequence(raw: source)//4、创建PrimitiveSequence对象并保存Observable序列对象
}
           
  • 内部实现一个闭包

    block

    ,用来接收外界传入的

    SingleEvent

    信号,接着做进一步的信号发送
  • 调用外部实现的闭包方法,将内部实现的闭包

    block

    发送出去,起连接作用
  • 创建

    PrimitiveSequence

    对象并保存

    Observable

    序列对象

    source

    ,返回

    PrimitiveSequence

    对象

create

方法内部实际上实现了一个

Observable

序列,由此可见

Single

序列是对

Observable

序列的封装,

Disposable

对象通过闭包交由业务层创建,

Single

序列在实现上,方式方法与

Observable

保持一致,此处可称一绝。当前我们只探索

Single

的信号是如何完成传递,

Observable

序列的信号传递流程在《Swift核心源码探索》中有详细介绍。

2.3、订阅序列

也是在同

PrimitiveSequenceType

扩展中定义,代码如下(此处代码标记为2️⃣):

public func subscribe(onSuccess: ((ElementType) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil) -> Disposable {
    #if DEBUG
         let callStack = Hooks.recordCallStackOnError ? Thread.callStackSymbols : []
    #else
        let callStack = [String]()
    #endif

    return self.primitiveSequence.subscribe { event in
        switch event {
        case .success(let element):
            onSuccess?(element)
        case .error(let error):
            if let onError = onError {
                onError(error)
            } else {
                Hooks.defaultErrorHandler(callStack, error)
            }
        }
    }
}
           

方法中先调用了

self.primitiveSequence

方法,返回了

self

,方法是在遵循

PrimitiveSequenceType

协议的

PrimitiveSequence

的扩展中,为了保证协议的一致性。代码如下:

extension PrimitiveSequence: PrimitiveSequenceType {
    /// Additional constraints
    public typealias TraitType = Trait
    /// Sequence element type
    public typealias ElementType = Element

    // Converts `self` to primitive sequence.
    ///
    /// - returns: Observable sequence that represents `self`.
    public var primitiveSequence: PrimitiveSequence<TraitType, ElementType> {
        return self
    }
}
           

紧接着调用另一个

subscribe

方法,代码如下(此处代码标记为3️⃣):

public func subscribe(_ observer: @escaping (SingleEvent<ElementType>) -> Void) -> Disposable {
    var stopped = false
    return self.primitiveSequence.asObservable().subscribe { event in
        if stopped { return }
        stopped = true
        
        switch event {
        case .next(let element):
            observer(.success(element))
        case .error(let error):
            observer(.error(error))
        case .completed:
            rxFatalErrorInDebug("Singles can't emit a completion event")
        }
    }
}
           
  • self.primitiveSequence -> asObservable() -> subscribe

  • 此处截断了

    completed

    信号的向上传递,因此

    Single

    序列只能收到响应信号和错误信号

该段代码也调用了

self.primitiveSequence

方法,接着调用

asObservable()

方法,查看代码发现此处是为了获取

source

对象,即

Observable

可观察序列。

再查看

subscribe

的方法(此处标记为代码4️⃣):

public func subscribe(_ on: @escaping (Event<E>) -> Void)
    -> Disposable {
        let observer = AnonymousObserver { e in
            on(e)
        }
        return self.asObservable().subscribe(observer)
}
           
  • 代码创建了一个观察者,当前观察者将会收到发送过来的消息,并由此通过闭包一层层传到业务层。 4️⃣ -> 3️⃣ -> 2️⃣ -> 1️⃣ ->业务层
  • 当前self指向的是1️⃣处创建并保存的

    Observable

    类型的

    source

    对象,因此该处

    subscribe

    所调用的即是

    Produce

    类中的

    subscribe

    方法,在方法内部创建了

    sink

    对象,来触发创建序列时实现的闭包,即代码1️⃣处所

    create

    后的闭包
  • 此时就到了业务层,通过create内部实现的闭包

    single

    向内部发送消息,再有

    observer

    调用

    on

    来向观察者发送信号
  • 信号发送不做赘述,最终会到达4️⃣处代码的观察者,此时再由闭包一层层向上传递,直到业务层的监听闭包

总结:

序列的产生,订阅,发送,接收还是由

Observable

来实现的,

Single

只是对

Observable

做了封装,去除了

onCompleted

的消息监听及消息发送。

具体的Observable序列产生到观察流程见《Swift核心源码探索》

三、Completable

只能产生

completed

事件和

error

事件,没有序列元素值产生。

Completable.create { (completable) -> Disposable in
    completable(.completed)
    //completable(.error(NSError.init(domain: "出现异常", code: 101, userInfo: nil)))
    return Disposables.create()
}.subscribe(onCompleted: {
    print("Completable")
}) { (error) in
    print(error)
}.disposed(by: disposeBag)
           
  • 应用场景,只关心任务是否完成,不关心不需要结果
  • Competable.swift

    下,在

    PrimitiveSequenceType

    扩展中实现了序列的创建,订阅,即信号转发

定义如下:

/// Sequence containing 0 elements
public enum CompletableTrait { }
/// Represents a push style sequence containing 0 elements.
public typealias Completable = PrimitiveSequence<CompletableTrait, Swift.Never>

public enum CompletableEvent {
    /// Sequence terminated with an error. (underlying observable sequence emits: `.error(Error)`)
    case error(Swift.Error)
    
    /// Sequence completed successfully.
    case completed
}
           

同样

Completable

类也是

PrimitiveSequence

的别名,并声明一个枚举包含,

error

completed

成员变量,限定了事件产生类型。都是对

Observable

序列的封装,源码此处不做探索说明,和

Single

一致,只是在订阅阶段对

.next

事件做了拦截。

四、Maybe

Single

序列相似,发出一个元素或一个

completed

事件或

error

事件。

Maybe<Any>.create { (maybe) -> Disposable in
        maybe(.success("element"))
        //maybe(.completed)
        //maybe(.error(NSError.init(domain: "出现异常", code: 101, userInfo: nil)))
        return Disposables.create()
    }.subscribe(onSuccess: { (val) in
        print(val)
    }, onError: { (error) in
        print("error:\(error)")
    }) {
        print("completed")
    }.disposed(by: disposeBag)
           

在开发中,如果一个业务有时候需要一个元素,有时候只需要知道处理完成的时候,可以使用该

Maybe

,解决不确定需求问题。源码探索略,同上。

五、Driver

  • Driver

    序列不会产生

    error

    事件
  • 在主线程中监听,会向新订阅者发送上次发送过的元素,简化

    UI

    层的代码
  • 共享序列

下面看一下为什么会扩展一个

Driver

序列。

有一个需求:

  • 搜索框中每次输入一个文本,获取一次网络请求,成功后渲染

    UI

    ,多个控件显示

先实现一个简单的

UI

let tf = UITextField.init(frame: CGRect(x: 100, y: 100, width: 200, height: 40))
tf.borderStyle = .roundedRect
tf.placeholder = "请输入"
self.view.addSubview(tf)

let label1 = UILabel.init(frame: CGRect(x: 100, y: 160, width: 200, height: 40))
label1.backgroundColor = .groupTableViewBackground
label1.textAlignment = .center
self.view.addSubview(label1)

let label2 = UILabel.init(frame: CGRect(x: 100, y: 210, width: 200, height: 40))
label2.backgroundColor = .groupTableViewBackground
label2.textAlignment = .center
self.view.addSubview(label2)
           

创建了一个

textfield

,两个

label

用来展示。下面在来实现一个网络请求,返回一个

Single

序列:

func network(text:String) -> Single<Any> {
    return Single<Any>.create(subscribe: { (single) -> Disposable in
        if text == "1234"{
            single(.error(NSError.init(domain: "出现错误", code: 101, userInfo: nil)))
        }
        DispatchQueue.global().async {
            print("请求网络")
            single(.success(text))
        }
        return Disposables.create()
    })
}
           

网络请求为耗时操作,因此我们在异步中来完成,直接发送序列,假装我们请求了一次网络。

1、普通方法实现

textfield

输入序列的监听,并调取网络请求方法:

let result = tf.rx.text.orEmpty.skip(1)
                .flatMap{
                    return self.network(text: $0)
                        .observeOn(MainScheduler.instance)
                        .catchErrorJustReturn("网络请求失败")
            }.share(replay: 1, scope: .whileConnected)
//网络请求将发送多次请求
result.subscribe(onNext: { (val) in
    print("订阅一:\(val) 线程:\(Thread.current)")
}).disposed(by: disposeBag)

result.subscribe(onNext: { (val) in
    print("订阅二:\(val) 线程:\(Thread.current)")
}).disposed(by: disposeBag)

result.map{"\(($0 as! String).count)"}.bind(to: label1.rx.text).disposed(by: disposeBag)
result.map{"\($0)"}.bind(to: label2.rx.text).disposed(by: disposeBag)
           
  • flatMap

    将原序列转换为Observables,将这些Observables的元素合并之后发出
  • observeOn

    选择在哪个线程执行
  • catchErrorJustReturn

    错误处理,将

    onError

    事件转为

    onNext

    事件
  • share

    为多个观察者共享资源,网络请求只发送呢一次,否则多个订阅将会触发多个请求

2、Driver实现:

let result = tf.rx.text.orEmpty
    .asDriver()
    .flatMap {
        return self.network(text: $0).asDriver(onErrorJustReturn: "网络请求失败")
    }
result.map{"长度:\(($0 as! String).count)"}
        .drive(label1.rx.text).disposed(by: disposeBag)
result.map{"\($0)"}.drive(label2.rx.text).disposed(by: disposeBag)
           
  • asDriver()

    将序列转换为

    Driver

    序列
  • map

    重新组合并生成新的序列
  • driver

    将元素在主线程中绑定到

    label1

    label2

相比非

driver

下的代码实现,

Driver

序列省去了线程的设置,

share

数据共享设置。

Driver源码探索

断点查看

asDriver()

方法:

extension ControlProperty {
    /// Converts `ControlProperty` to `Driver` trait.
    ///
    /// `ControlProperty` already can't fail, so no special case needs to be handled.
    public func asDriver() -> Driver<E> {
        return self.asDriver { _ -> Driver<E> in
            #if DEBUG
                rxFatalError("Somehow driver received error from a source that shouldn't fail.")
            #else
                return Driver.empty()
            #endif
        }
    }
}
           

ControlProperty

的扩展方法,返回了一个

Driver<E>

类,

Driver

SharedSequence

的别名,用来描述不同类型的序列,最后又调用了

asDriver

方法,而该方法在

ObservableConvertibleType

的扩展中,一直追踪会发现很多类都是继承自

ObservableConvertibleType

下。

extension ObservableConvertibleType {
    public func asDriver(onErrorRecover: @escaping (_ error: Swift.Error) -> Driver<E>) -> Driver<E> {
        let source = self
            .asObservable()
            .observeOn(DriverSharingStrategy.scheduler)
            .catchError { error in
                onErrorRecover(error).asObservable()
            }
        return Driver(source)
    }
}
           

如上代码也设置了

observerOn

方法,来指定线程,继续深入能够发现

DriverSharingStrategy.scheduler

内部指定的就是主线程,印证了上面所说的

Driver

的执行是在主线程的。最后初始化一个

Driver

对象返回,看一下初始化过程,及对

SharedSequence

类的初始化,代码如下:

public struct SharedSequence<S: SharingStrategyProtocol, Element> : SharedSequenceConvertibleType {
    public typealias E = Element
    public typealias SharingStrategy = S

    let _source: Observable<E>

    init(_ source: Observable<E>) {
        self._source = S.share(source)
    }
}
           

此处调用了

share

并传入了可观察序列,感觉好像在哪见过,此处猜想它是用来共享序列的,使用

lldb:po S.self

查找

share

所在位置:

RxCocoa.DriverSharingStrategy

cmd+点击

进入,代码如下:

public typealias Driver<E> = SharedSequence<DriverSharingStrategy, E>

public struct DriverSharingStrategy: SharingStrategyProtocol {
    public static var scheduler: SchedulerType { return SharingScheduler.make() }
    public static func share<E>(_ source: Observable<E>) -> Observable<E> {
        return source.share(replay: 1, scope: .whileConnected)
    }
}
           

在此处传入的序列用来掉用

share

,使得当前序列作为共享序列,即

driver

序列为共享序列。

六、Signal

Driver

相似,不会产生

error

事件,在主线程执行,但不会像

Driver

一样会给新观察者发送上一次发送的元素。

使用如下:

let event : Driver<Void> = button.rx.tap.asDriver()
event.drive(onNext: {
    print("yahibo")
    event.drive(onNext: {
        print("yahibo1")
    }).disposed(by: self.disposeBag)
}).disposed(by: disposeBag)
           

运行打印,发现在点击后重新订阅的观察者,会直接收到点击事件,这是我们业务不允许的。下面再看

Signal

序列:

let event : Signal<Void> = button.rx.tap.asSignal()
event.emit(onNext: {
    print("yahibo")
    event.emit(onNext: {
        print("yahibo1")
    }).disposed(by: self.disposeBag)
}).disposed(by: disposeBag)
           

运行结果,每一个新序列都会在点击后触发。

七、ControlEvent

专门用于描述

UI

控件所产生的事件,不会产生

error

事件,在主线程中监听。代码如下:

1、监听点击事件

let event : ControlEvent<Void> = button.rx.tap.asControlEvent()
event.bind(onNext: {
    print("controllerEvent")
}).disposed(by: disposeBag)
           

2、监听点击事件并绑定数据到其他UI

let event : ControlEvent<Void> = button.rx.tap.asControlEvent()
event.map{"yahibo"}.bind(to: label1.rx.text).disposed(by: disposeBag)
           

总结:

以上序列都是基于

Observable

的,是对其更高层的封装,针对不同应用场景设计,简化不同场景下序列的使用流程。