看个例子:
const myObservable = of(1, 2, 3);
// 创建一个观察者对象-Observer(处理next、error、complete回调)
const myObserver = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
// 通过Observable的subscribe函数,观察者去订阅可观察者的消息
myObservable.subscribe(myObserver);
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5SNxY2YxUTNmFTN2YmNygDO2kzN0YzYzIGNmJTNxATMl9CX5d2bs92Yl1iclB3bsVmdlR2LcNWaw9CXt92Yu4GZjlGbh5yYjV3Lc9CX6MHc0RHaiojIsJye.png)
调用Observable的subscribe方法,传入一个包含回调函数的observer对象:
后两个参数都是undefined:
在toSubscriber函数里,因为nextOrObserver是我手动传入的对象,所以前两个IF条件均不满足:
进入默认实现,新建一个Subscriber对象:
Subscriber是Subscription的子类:
我们现在的Subscriber的构造函数里,创建一个SafeSubscruber实例:this作为parent subscriber传入
从SafeSubscriber的实现能看出,传入的Observer对象的next,error和complete这些函数名称都是硬编码的,必须符合这个命名规范:
执行subscribe:
sink的destination包含了应用程序传入的complete, next, error逻辑:
这里能看到,subscribe的逻辑就是,遍历所有Observable参数,依次调用observer的next方法,最后再调用一次complete方法:
this._next调用this.destination.next:
最终调用到应用程序员传入的next方法:
最后的输出: