RxSwift(2)源码分析-继承链

RxSwift(1)源码分析-序列的创建及订阅)中我们提到了匿名可观察序列AnonymousObservable的的继承关系,本文将从类或协议的角度,继续分析可观察序列和观察者的继承关系。

序列继承链

如下,这是我们创建Observable所涉及到的继承关系。

1
AnonymousObservable => Producer => Observable => ObservableType => ObservableConvertibleType

由上至下,我们简单分析一下每一层。

协议 - ObservableConvertibleType

1
2
3
4
5
6
7
8
9
10
11
public protocol ObservableConvertibleType {
/// Type of elements in sequence.
associatedtype Element

@available(*, deprecated, message: "Use `Element` instead.")
typealias E = Element

/// Converts `self` to `Observable` sequence.
/// - returns: Observable sequence that represents `self`.
func asObservable() -> Observable<Element>
}

看注释可以明白,使用了关联类型提供了一个可转换为可观察序列Observable的方法,这个方法在遵守这个协议的类中实现即可。

这是最底层的协议,即可满足“万物皆序列”的目的。例如:

1
UISwitch().rx.value.asObservable();

因为value是结构体ControlProperty类型的,而ControlProperty底层又遵守了ObservableConvertibleType协议,所以最后value可以被转换为一个可观察序列。

协议 - ObservableType

这是一个可观察序列协议,于是目前提供了一个每个可观察序列都一定会有的订阅方法subscribe,只有外部订阅了该对象,才能真正实现对该对象进行观察。

1
2
3
public protocol ObservableType: ObservableConvertibleType {
func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}

类 - Observable

真正的类,可以称之为元类,对于用户来说Observable 的功能是完整的,因为它已经具备了所有的用户所需要的功能,尽管有些方法并没有得到实现仍是抽象方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Observable<Element> : ObservableType {  // Observable 可观察序列,所有序列的基类
/// Type of elements in sequence.
public typealias E = Element // 起别名

init() {
#if TRACE_RESOURCES
_ = Resources.incrementTotal() // 初始化的时候,类似于 引用计数 +1
#endif
}
// 协议的实现
public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
rxAbstractMethod() // 抽象方法,父类不实现,子类去实现
}

public func asObservable() -> Observable<E> { // 返回的是一个 Observable对象
return self
}

deinit {
#if TRACE_RESOURCES
_ = Resources.decrementTotal() // 销毁的时候,类似于 引用计数 -1 ; 从而根据进入一个页面前,再退出一个页面后类似的操作,总共引用计数的变化来判断是否有内存泄漏问题
#endif
}

// this is kind of ugly I know :(
// Swift compiler reports "Not supported yet" when trying to override protocol extensions, so ¯\_(ツ)_/¯ 想优化协议扩展,但是swift不支持,虽然不知道他想具体怎么优化

/// Optimizations for map operator 优化map函数
internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
return _map(source: self, transform: transform)
}
}

上面注释写的很清楚了,主要是三个功能:

  • 便于内存管理,实现了当Observable初始化和销毁的时候,分别实现Resources.incrementTotal()方法和Resources.decrementTotal()方法
  • 实现了遵守的协议方法subscribe抽象方法,但是并没有具体实现,交给子类去实现
  • 实现了协议方法asObservable(),使其子类调用这个方法都能返回一个可观察序列Observable

类 - Producer

Producer继承自Observable,主要是具体实现了父类的订阅方法subscribe,并且提供了一个run方法,但是并没有具体实现,交给子类去实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}

override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired { // CurrentThreadScheduler 调度者
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

return disposer // 销毁
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

return disposer
}
}
}

func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
rxAbstractMethod() // 抽象方法,父类不实现,子类去实现
}
}

类 - AnonymousObservable

在创建序列的creat方法中返回的就是这么个玩意儿,里面保存了创建序列的回调,其中的run方法也会在序列的产生和订阅工程中使用到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable // 起个别名为SubscribeHandler

let _subscribeHandler: SubscribeHandler

init(_ subscribeHandler: @escaping SubscribeHandler) { // 带了一个SubscribeHandler参数的初始化
self._subscribeHandler = subscribeHandler // 保存了订阅管理者
}

override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel) // 这里传进去了 订阅者 销毁者
let subscription = sink.run(self) // 这里的self就是 AnonymousObservable 这里传进去了 可观察序列,即 ob
return (sink: sink, subscription: subscription)
}
}

订阅继承链

在订阅方法中,我们会创建一个AnonymousObserver对象,并保存当前函数的参数闭包(EventHandler),下面我们至上而下分析AnonymousObserver继承链的各个类。

1
类AnonymousObserver --> 类ObserverBase --> 协议Disposable,协议ObserverType

协议 - ObserverType

主要是提供了一个所有订阅者都会有的事件处理on方法。以及扩展的序列会出现的NextCompletedError三种状态,只不过这里on的方法是遵守这个协议的类的子类实现的on方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public protocol ObserverType {
func on(_ event: Event<E>)
}

extension ObserverType {
public func onNext(_ element: E) {
self.on(.next(element)) // 这里的on,是遵守了ObserverType协议的 AnonymousObservableSink的on方法 即link来处理结果
}

public func onCompleted() {
self.on(.completed) // link来处理结果
}

public func onError(_ error: Swift.Error) {
self.on(.error(error)) // link来处理结果
}
}

协议 - Disposable

提供了一个释放资源,即垃圾回收的方法。(这个后续详细分析其内部实现)

1
2
3
4
public protocol Disposable {
/// Dispose resource.
func dispose()
}

类 - ObserverBase

所有订阅者的基类.

  • 提供了onCore方法,但是并没有具体实现,交给子类具体实现
  • 实现on方法,根据序列状态next,error,completed,调用onCore方法,并将事件传过去,交由子类具体实现
  • 实现了垃圾回收的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType

private let _isStopped = AtomicInt(0)


func on(_ event: Event<E>) { // 这里的类型必须是前面传进来的类型,因为swift是一门强类型语言
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}

func onCore(_ event: Event<E>) {
rxAbstractMethod() // 抽象方法,交给子类具体实现
}

func dispose() {
fetchOr(self._isStopped, 1) // 当计数为0的时候,就回收
}
}

类 - AnonymousObserver

  • 在类初始化的时候,保存了_eventHandler对象
  • 和Observable一样,分别实现Resources.incrementTotal()方法和Resources.decrementTotal()方法来管理内存
  • 重写onCore方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType

typealias EventHandler = (Event<Element>) -> Void // 起别名

private let _eventHandler : EventHandler

init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._eventHandler = eventHandler // 保存了事件管理者,保存的即是前面AnonymousObserver大括号内的内容

}

override func onCore(_ event: Event<Element>) {
return self._eventHandler(event) //
}

#if TRACE_RESOURCES
deinit {
_ = Resources.decrementTotal()
}
#endif
}

总结

RxSwift的源码中可以很好的体现出swift面向协议编程的思想,每个协议都有自己独有的方法,并且每个类可以遵循多个协议,很好的完成了功能拓展,更好的职责划分,更加模块化。

使用了AnonymousObservableSink中间层来实现可观察序列AnonymousObservable和订阅者AnonymousObserver之间的通信。类似于其他框架中的manager

也可以在每个基类的初始化init和销毁方法中deinit实现监控内存管理的方法。