RxSwift(1)源码分析-序列的创建及订阅

OCRACRxSwift是一个针对于Swift版本的响应式编程框架。经过对官网和一些源码的阅读,决定做一些总结,方便以后知其所以然。

在RxSwift的使用中,我们需要先创建序列,然后订阅信号、发送信号来实现一个完整的流程。这边文章便针对这一流程进行简单解读和总结。

代码示例

附上使用代码,可见有三大流程

1
1.创建序列 => 2.订阅信号 => 3.发送信号。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 1:创建序列
let observable = Observable<Any>.create { (obserber) -> Disposable in
// 3:发送信号
obserber.onNext("发送信号")
obserber.onCompleted()
//obserber.onError(NSError.init(domain: "fail", code: 10087, userInfo: nil))
return Disposables.create()
}

// 2:订阅信号
let _ = observable.subscribe(onNext: { (text) in
print("订阅到信号:\(text)")
}, onError: { (error) in
print("error: \(error)")
}, onCompleted: {
print("完成")
}) {
print("销毁")
}

分析以上代码,我们得知2中闭包的执行是依赖1中闭包的执行的,但是1中闭包何时执行就不得而知,接着我们便挨着分析每个流程的源码。

分析Creat函数

首先来看Creat的定义函数。可知可观察序列的创建是利用协议拓展功能的creat方法实现,返回了一个AnonymousObservable(匿名可观察序列)。

1
2
3
4
5
6
7
//creat.swift文件中。
extension ObservableType {
// MARK: create
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
}

进入到AnonymousObservable这个类中,如下可知:其内部用self._subscribeHandler = subscribeHandler这样一句代码来保存创建函数的闭包A。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//AnonymousObservable就是一个内部类,具备一些通用特性
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

let _subscribeHandler: SubscribeHandler

init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}

override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}

关于AnonymousObservable的继承关系如下:

1
AnonymousObservable => Producer => Observable => ObservableType => ObservableConvertibleType

分析Subscribe函数

通过以下函数可知,函数中创建了一个AnonymousObserver的对象。最后调用了self.asObservable().subscribe(observer),通过subscribe函数把创建好的AnonymousObserver传递到AnonymousObservable这个类中。相当于调用了闭包A,并完成了传值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
//ObservableType+Extensions 中
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable

if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}

#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif

let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
//创建匿名观察者。
//初始化的时候传递闭包参数。
//E 这里的意思是 Swift 的关联类型,这个如果仔细看过可观察序列的继承链源码应该不难得出:这个E 就是我们的 序列类型,我们这里就是String
let observer = AnonymousObserver<E> { event in

#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif

switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
//将创建好的AnonymousObserver传递给AnonymousObservable
//将观察传递给观察者。
self.asObservable().subscribe(observer),
disposable
)
}

AnonymousObserver是匿名观察者,用于存储和处理事件的闭包。定义如下:

可知这里用_eventHandler保存了外面传入的事件闭包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final class AnonymousObserver<Element>: ObserverBase<Element> {
typealias EventHandler = (Event<Element>) -> Void

private let _eventHandler : EventHandler

init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
//将subcrible的参数闭包 保存到这个变量中。
self._eventHandler = eventHandler
}

override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}

#if TRACE_RESOURCES
deinit {
_ = Resources.decrementTotal()
}
#endif
}

关于self.subscribe(observer)传值,其实是通过可观察序列的继承关系,通过Producer中方法的具体实现来完成相应效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//Producer类中
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
....省略此处代码。
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
....省略此处代码。
return disposer
}
}
}

可知:self.run 这个代码最终由我们生产者 Producer 延伸到我们具体的事务代码 AnonymousObservable.run,并将Observer对象传过来, 定义如下。

这里通过使用AnonymousObservableSink的中间对象,将可观察者Observable和观察者Observer链接起来,实现事件的传递,起到一个桥梁的作用。

1
2
3
4
5
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)//这里的
return (sink: sink, subscription: subscription)
}

sink.run的执行可追溯到AnonymousObservableSink中:

1
2
3
4
5
6
//parent指传过来的AnonymousObservable对象
func run(_ parent: Parent) -> Disposable {
//这里调用了闭包A,传值为AnyObserver(self)
//这里解释了create的闭包什么时候执行
return parent._subscribeHandler(AnyObserver(self))
}

AnyObserver(self): 把AnonymousObservableSink对象转换成AnyObserver对象。

AnyObserver源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
public struct AnyObserver<Element> : ObserverType {
public typealias E = Element

public typealias EventHandler = (Event<Element>) -> Void

private let observer: EventHandler
... 省略其它代码
public init<O : ObserverType>(_ observer: O) where O.E == Element {
// 把AnonymousObservableSink类的on函数赋值给AnyObserver类的observer变量。从这里就可以明白为什么这行代码observer.onNext("发送信号") 最终会触发AnonymousObservableSink.on事件。
self.observer = observer.on
}
}

事件的发送

AnonymousObservableSink中,关于on方法的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// AnonymousObservableSink类on方法
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}

self.forwardOn(event),进入到Sink中的forwardOn方法。

1
2
3
4
5
6
7
8
9
10
final func forwardOn(_ event: Event<O.E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
self._observer.on(event)
}

这里的_observer就是第二步调用subscribe函数里面创建的observer对象。
会先进入到父类的ObserverBaseon方法

1
2
3
4
5
6
7
8
9
10
11
12
func on(_ event: Event<E>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
1
2
3
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}

这里的_eventHandler闭包就是第二步调用subscribe函数传入的闭包。

总结

至此,大概流程已解读完毕,用一张图来总结下。