Reactive 框架实现原理

本文最后更新于 2021年4月4日 晚上

通过 ReactiveKit 学习典型的 Reactive 框架实现原理. 适合读者: 有响应式框架 API 使用经验, 并想要深入理解响应式框架实现原理和相关概念的读者.

摘自这个链接.

ReactiveKit 是一个轻量级的 Swift Framework, 用于响应式(reactive)和函数响应式编程.

这个框架兼容所有的苹果平台和 Linux. 如果开发 iOS 或 macOS 上的 app, 可以结合 Bond Framework 来使用 UIKit 和 AppKit 上的绑定, 响应式代理和数据源.

这个文档的主要作用是从实现的角度介绍这个框架. 在最后让读者能够对这个框架的原理有一个最佳的认识.

目录

  1. 简介
  2. Signal
  3. 将异步调用封装到 signal 中
  4. signal 的释放(dispose)
  5. signal 的变换(transform)
  6. 错误处理
  7. 创建简单的 signal
  8. 使用 bag 释放 signal
  9. 线程
  10. 绑定: 包括绑定 target 和 property
  11. 共享事件序列
  12. Subject
  13. 可连接(connectable) signal
  14. 追踪 signal 的状态
  15. Property
  16. 加载 signal: 消费 loading state, 对 loading signal 进行变换, loading property
  17. 其他常用模式: 由 next 事件触发动作(action), 组合(combine)多个 signal
  18. debugging

其他的还包括:

  1. 系统需求
  2. 安装: 使用 Pod 和 Carthage, SPM

简介

给定一个文本输入框, 由于外部输入, 引起输入框的状态变化. 状态变化可以用**状态变化序列(Signal)**来描述:

1
---[J]---[Ji]---[Jim]--->

有的地方也把序列称为 Observable, 因为序列始终都是要被外界使用, 而使用的唯一方式就是观察, 因此命名为”可观察”.

其他地方也有称序列为 Sequence, Signal, Stream. 不管名称如何, 它们意思都一样, 都表示可观察的随时间变化的事件流.

这个序列和普通的数组类似, 唯一的不同点是: 这个序列的元素随时间流转而产生, 而非所有元素一次性出现在内存中.

响应式编程的核心思想就是: 任何事物都可以表示为序列.

比如网络请求, 可以用下面的序列表示:

1
---[Response]--->

一次网络请求的结果是上面的响应序列, 序列中的元素有且只有一个, 就是该请求的响应.(虽然只有一个响应, 但仍然可以认为它构成一个序列, 该序列随时间推进, 在其中生成一个响应元素).

普通数组是有限的, 所以拥有 size 属性, 它代表该数组占用的内存多少. 当我们讨论随时间变化的序列时, 我们无法预知序列生命期内有多少事件生成, 但我们可以知道序列什么时刻会结束事件的生成.

为了获知序列结束, 我们引入一个特殊的事件: completion, 用于表示序列完成. 在完成事件之后, 序列不会再生成任何事件. 换句话说, 完成事件表示序列生命期结束.

可以使用竖线表示完成事件(completion), 如下所示:

1
---[J]---[Ji]---[Jim]---|--->

完成事件非常重要! 它的出现就预示着序列生命期结束, 这样我们就可以对序列的资源进行释放.

不过, 我们身处一个不完美的世界, 不可能所有的序列都正常结束. 因为随着时间推进, 系统肯定有出现错误的概率.

比如网络请求, 正常情况下获取服务器响应后, 网络请求序列就会结束. 但如果网络错误, 则会接收到错误, 此时我们可以引入另外一个特殊事件: 失败(failure)事件, 或者说错误(error)事件.

失败事件也表示序列的结束, 在产生失败事件后, 序列就不会再产生其他事件了.

经过上面的分析, 我们可以初步定义组成序列的事件:

1
2
3
4
5
6
7
8
9
10
11
12
/// 表示序列中的一个事件
public enum Event {

/// 普通事件, 关联一个特定类型的值
case next(Element)

/// 失败事件, 代表序列异常结束, 关联一个错误值
case failed(Error)

/// 完成事件, 代表序列正常结束, 没有关联值
case completed
}

为了实现上的方便, 将事件放到序列名空间下:

1
2
3
4
5
extension Signal {
public enum Event {
//...
}
}

故, 序列总是由零个或多个 next 事件, 附加一个 completionfailure 事件构成. 且在某个序列中, completionfailure 不会同时出现.

Signal 定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/// 代表事件的序列(事件流)
public protocol SignalProtocol {

/// 序列中生成的 next 事件携带的数据类型.
associatedtype Element

/// Failure 事件携带的错误的数据类型.
associatedtype Error: Swift.Error

/// 向序列注册观察者(换一种表达就是: 订阅这个序列)
/// - Parameter observer: 外部传入的函数, 这个函数可以接收事件(序列产生事件后, 会将事件作为参数传递给这个函数)
/// - Returns: 订阅关系(Disposable), 外界获取订阅关系后, 可以根据需要随时取消订阅.
public func observe(with observer: @escaping Observer<Element, Error>) -> Disposable
}

从上面的代码可知, 在序列上的一个最重要的操作就是 observe. 序列上的事件会被 Observer 接收, 而 Observer 的定义如下所示:

1
2
/// Represents a type that receives events.
public typealias Observer<Element, Error: Swift.Error> = (Signal<Element, Error>.Event) -> Void

下面就来看看 Signal 的具体实现.

Signal

在前面已经介绍了 SignalProtocol 的定义, 下面来看 Signal 的实现.

下面是 Signal 的最简实现:

1
2
3
4
5
6
7
8
9
10
11
12
public struct Signal<Element, Error: Swift.Error>: SignalProtocol {

private let producer: (Observer<Element, Error>) -> Void

public init(producer: @escaping (Observer<Element, Error>) -> Void) {
self.producer = producer
}

public func observe(with observer: @escaping Observer<Element, Error>) {
producer(observer)
}
}

其中 Element 类型表示事件元素类型, Error 表示错误事件元素携带的错误数据类型.

在其中有一个重要的属性 producer, 将 observer 传入 producer, 即可让 observer 在事件发生的时候得到通知: 在开始订阅过程后, producer 负责将事件传递给 observer.

比如下面的序列:

1
---[1]---[2]---[3]---|--->

在代码中表现出来是这样的:

1
2
3
4
5
6
7
8
9
10
let counter = Signal<Int, Never> { observer in

// send first three positive integers
observer(.next(1))
observer(.next(2))
observer(.next(3))

// send completed event
observer(.completed)
}

构造函数传入的这个函数就是 producer, 在 producer 的实现中将事件传递给 observer.

为了使用上的方便, 这里再定义 ObserverProtocol, 表示订阅者:

1
2
3
4
5
6
7
8
9
10
11
12
/// Represents a type that receives events.
public protocol ObserverProtocol {

/// Type of elements being received.
associatedtype Element

/// Type of error that can be received.
associatedtype Error: Swift.Error

/// Send the event to the observer.
func on(_ event: Signal<Element, Error>.Event)
}

这样 producer 可以使用 on 方法向 Observer 传入事件.

另外再定义一些其他的扩展方法用于传入事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public extension ObserverProtocol {

/// Convenience method to send `.next` event.
public func receive(_ element: Element) {
on(.next(element))
}

/// Convenience method to send `.failed` or `.completed` event.
public func receive(completion: Subscribers.Completion<Error>) {
switch completion {
case .finished:
on(.completed)
case .failure(let error):
on(.failed(error))
}
}

/// Convenience method to send `.next` event followed by a `.completed` event.
public func receive(lastElement element: Element) {
receive(element)
receive(completion: .finished)
}
}

通过这些 receive 方法来传入事件, 之前的代码就可以修改为:

1
2
3
4
5
6
7
8
9
10
let counter = Signal<Int, Never> { observer in

// send first three positive integers
observer.receive(1)
observer.receive(2)
observer.receive(3)

// send completed event
observer.receive(completion: .finished)
}

实际上和最开始的代码在写法上没有多少区别, 但这样做可以更易读, 且将不同类型的事件分别处理(next, error/completion).

通过上述过程, 就实现了一个序列, 以及预定义了序列中应该出现的元素以及元素的传递给 observer 的方式(producer 函数内部调用 observer 的 receive 方法).

下面只需要为这个序列添加实际的观察者, 即可形成完整的订阅:

1
2
3
counter.observe(with: { event in
print(event)
})

执行后输出如下:

1
2
3
4
next(1)
next(2)
next(3)
completed

将异步操作封装到序列中

响应式编程的主要用途就是把异步操作以一种可控的方式进行管理. 下面就来看看如何把网络操作封装到序列中.

比如我们又如下的方法:

1
func getUser(completion: (Result<User, ClientError>) -> Void) -> URLSessionTask

可以将它转换为下面的写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
func getUser() -> Signal<User, ClientError> {
return Signal { observer in
getUser(completion: { result in
switch result {
case .success(let user):
observer.receive(user)
observer.receive(completion: .finished)
case .failure(let error):
observer.receive(completion: .failure(error))
})
// return disposable, continue reading
}
}

经过上面的处理, 这段代码实际上已经是部分正常工作的了(还没有实现 dispose 过程, 后续介绍, 所以说是部分工作).

这个序列中可以是如下两种状态:

  • 正常获取响应并结束:

    1
    ---[User]---|--->
  • 异常结束:

1
---Error--->

添加观察者后, 便可以工作了:

1
2
3
4
5
let user = getUser()

user.observe { event in
print(event) // prints ".next(user), .completed" in case of successful response
}

细心的读者可以发现, 当没有调用序列的 observe 方法时, 序列是没有开始工作的, 因为 producer 的调用只在 observe 方法中进行! 在 producer 方法中才开始网络通信并获取结果. 这样就将之前以回调方式处理的网络请求转换成了以序列方式进行.

另外, 如果我们多次调用 observe, 则会触发多次网络请求(如果要只触发一次, 应该如何进行? 这就要看后面的 share 操作了).

释放 (Disposing) Signal

https://github.com/DeclarativeHub/ReactiveKit#disposing-signals

当某页面退出时, 经常需要把正在进行的网络请求取消掉, 上述 getUser 方法, 如果是原始的返回 URLSessionTask 那个, 则可以在 task 上调用取消, 但如果是现在这样返回 Signal 的, 如何取消呢?

其实, 之前的实现还漏掉一个关键内容, 就是 disposable 的实现, 在 observe 方法中返回的是一个 Disposable, 定义如下:

1
2
3
4
5
6
7
public protocol Disposable {
/// Cancel the signal observation and any underlying tasks.
func dispose()

/// Returns `true` if already disposed.
var isDisposed: Bool { get }
}

Disposable 有许多实现, 我们先关注其中用得最多的一种 BlockDisposable. 当 signal 被释放时常会进行一些清理工作, 清理工作通过一个块携带:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final class BlockDisposable: Disposable {

private var handler: (() -> Void)?

public var isDisposed: Bool {
return handler == nil
}

public init(_ handler: @escaping () -> Void) {
self.handler = handler
}

public func dispose() {
handler?()
handler = nil
}
}

可以看到, 当创建 BlockDisposable 时会传入一个块, dispose 的时候就是在执行这个块中的内容, 实际和 RxSwift 中带块参数的 Disposables.create 实现是类似的.

这样的话, 上述 Single 的实现就完整了:

1
2
3
4
5
6
7
8
9
10
11
12
public struct Signal<Element, Error: Swift.Error>: SignalProtocol {

private let producer: (Observer<Element, Error>) -> Disposable

public init(producer: @escaping (Observer<Element, Error>) -> Disposable) {
self.producer = producer
}

public func observe(with observer: @escaping Observer<Element, Error>) -> Disposable {
return producer(observer)
}
}

将上面实例化 Signal 时 producer 块的实现修改为如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func getUser() -> Signal<User, ClientError> {
return Signal { observer in
let task = getUser(completion: { result in
switch result {
case .success(let user):
observer.receive(user)
observer.receive(completion: .finished)
case .failure(let error):
observer.receive(completion: .failure(error))
})

return BlockDisposable {
task.cancel()
}
}
}

这样的话, 订阅者端就可以这样写了:

1
2
3
let disposable = getUser().observe { event in
print(event)
}

当页面退出时, 就可以调用 dispose 来释放订阅了:

1
disposable.dispose()

上述只是简单概念实现, 实际在 Kit 中有额外的措施来保障当 Signal 释放后, 不会再有事件产生. 另外当 Signal 产生结束事件后, 也会被自动释放.

Signal 的变换操作符

操作符(Operator)实际上是函数, 用于将 Signal 变换为另外一种 Signal. 比如基本的过滤操作符, 有一系列城市, 需要将其按首字母过滤:

1
2
3
4
5
filter(
---[Berlin]---[Paris]---[London]---[Porto]---|--->
)

--------------[Paris]--------------[Porto]---|--->

这样的操作符是如何实现的呢? 非常简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
extension SignalProtocol {
/// Emit only elements that pass `isIncluded` test.
public func filter(_ isIncluded: @escaping (Element) -> Bool) -> Signal<Element, Error> {
return Signal { observer in
return self.observe { event in
switch event {
case .next(let element):
if isIncluded(element) {
observer.receive(element)
}
default:
observer(event)
}
}
}
}
}

上述代码中, 创建了一个新的 Signal, producer 中将自己接收的事件在块中再次发送给新的 Signal 的 observer, 在发送的时候进行过滤操作.

使用时, 可以像下面这样:

1
2
3
cities.filter { $0.hasPrefix("P") }.observe { event in
print(event) // prints .next("Paris"), .next("Porto"), .completed
}

还有许多的操作符, 实际上框架中大部分代码就是这些操作符的实现.

比如下面这个常用的操作符, 只观察 next 事件:

1
2
3
4
5
6
7
8
9
10
11
extension SignalProtocol {

/// Register an observer that will receive elements from `.next` events of the signal.
public func observeNext(with observer: @escaping (Element) -> Void) -> Disposable {
return observe { event in
if case .next(let element) = event {
observer(element)
}
}
}
}

日常开发中如果只关注 next 事件, 则可以调用上面这个方法来订阅 Signal.

使用时这样写:

1
2
3
cities.filter { $0.hasPrefix("P") }.observeNext { name in
print(name) // prints "Paris", "Porto"
}

有了这样的概念后, 如果需要一些框架没有提供的操作符, 就可以自己去写了!

Never 错误类型

Error 会终止 Signal 的执行, 但在某些情况下, 不希望 Signal 中出现错误, 那应该如何处理呢?(比如视图中)

在 ReactiveKit 中有如下类型:

1
2
3
/// An error type that cannot be instantiated. Used to make signals non-failable.
public enum Never: Error {
}

这个错误类型不能被初始化, 因此, 从源头上保证了使用这一错误类型的 Signal 中永远不会存在错误.

比如使用如下代码:

1
2
3
4
5
let signal = Signal<Int, Never> { observer in
...
observer.failed(/* What do I send here? */)
...
}

这段代码中就永远无法发送 Never 类型的错误, 因为这样的错误无法被初始化.

而且绑定只会使用安全的(不会出现错误)的 Signal.

Signal 的创建

1
2
3
4
5
6
7
8
9
10
11
let signal = Signal<Int, Never>.just(5)
let signal = Signal<Int, Never>.sequence([1, 2, 3])
let signal = Signal<Int, Never>.completed()
let signal = Signal<Int, MyError>.failed(MyError.someError)
let signal = Signal<Int, Never>.never()

// 延时
let signal = Signal<Int, Never>(just: 5, after: 60)

// 每 5 秒产生一个数值
let signal = Signal<Int, Never>(sequence: 0..., interval: 5)

通过 bag 释放 Signal

原理上说, 就是当 bag 被释放的时候, 它会对其中所有的 Signal 调用 dispose 方法, 从而将它们都释放掉.

在实践时, 如果不想到处创建 bag, 可以

(bag 实现这里没有讲, 可以看看源码)

略过线程/binding的相关内容暂时.

共享 Signal 的 API 使用

还是没有说明如何实现的. 这里需要看源码来补充. 只是讲解了 API 的使用.

Loading Signal 的实践

这样的 Signal 在实践中经常用到, 可以参考其中的实现.


Reactive 框架实现原理
https://blog.rayy.top/2020/11/15/2020-11-15-reactive-imp/
作者
貘鸣
发布于
2020年11月15日
许可协议