Skip to content

17响应式编程:如何保证程序状态自动更新?

在 iOS 开发中,随着 App 功能不断增强,处理各种异步事件,保持程序状态实时更新,也变得越来越困难。

以 ViewController 来为例,我们需要处理许多异步事件,比如来自 Delegate 和 DataSource 的回调,来自 NotificationCenter 的通知消息,来自 View 的 Target-Action 事件,等等。

由于它们随机发生且可能来自不同的线程,本身就会比较复杂,再加上其他新事件的引入,代码处理的逻辑会呈指数式增长。那么,怎样才能从根本上解决这些问题呢?这一讲我们所介绍的响应式编程就可以解决

响应式编程与 RxSwift

所谓响应式编程,就是使用异步数据流(Asynchronous data streams)进行编程。在传统的指令式编程语言里,代码不仅要告诉程序做什么,还要告诉程序什么时候做。而在响应式编程里,我们只需要处理各个事件,程序会自动响应状态的更新。而且,这些事件可以单独封装,能有效提高代码复用性并简化错误处理的逻辑。

现在,响应式编程已慢慢成为主流的编程范式,比如 Android 平台的 Architecture Components 提供了支持响应式编程的 LiveData, SwiftUI 也配套了 Combine 框架。在 Moments App 中,我采用的也是响应式编程模式。

目前比较流行的响应式编程框架有 ReactiveKit、ReactiveSwift 和 Combine。在这里,我们推荐使用RxSwift。因为 RxSwift 遵循了 ReactiveX 的 API 标准,由于 ReactiveX 提供了多种语言的实现,学会 RxSwift 能有效把知识迁移到其他平台。还有 RxSwift 项目非常活跃,也比较成熟。更重要的是,RxSwift 提供的 RxCocoa 能帮助我们为 UIKit 扩展响应式编程的能力,而 Combine 所对应的 CombineCocoa 还不成熟。

为了让 App 可以自动更新状态,我们在 Moments App 里面使用 RxSwift 把 MVVM 各层连接起来。

从上图可以看出,当用户打开朋友圈页面,App 会使用后台排程器 向 BFF 发起一个网络请求,Networking 模块把返回结果通过Observable 序列 发送给 Repository 模块。Repository 模块订阅接收后,把数据发送到Subject 里面,然后经过map 操作符 转换,原先的 Model 类型转换成了 ViewModel 类型。 ViewModel 模块订阅经过操作符转换的数据,发送给下一个Subject ,之后,这个数据被 ViewController 订阅,并通过主排程器更新了 UI。

整个过程中,Repository 模块、 ViewModel模块、ViewController 都是订阅者,分别接收来自前一层的信息。就这样,当 App 得到网络返回数据时,就能自动更新每一层的状态信息,也能实时更新 UI 显示。

这其中的 Observable 序列、订阅者、Subject 、操作符、排程器属于 RxSwift 中的关键概念,它们该如何理解,如何使用呢?接下来我就一一介绍下。

异步数据序列 Observable

为了保证程序状态的同步,我们需要把各种异步事件都发送到异步数据流里,供响应式编程发挥作用。在 RxSwfit 中,异步数据流称为 Observable 序列,它表示可观察的异步数据序列,也可以理解为消息发送序列。

在实际应用中,我们通常使用 Observable 序列作为入口,把外部事件连接到响应式编程框架里面。比如在 Moments App ,我通过 Observable 把网络请求的结果连接进 MVVM 架构中。

那么怎样创建 Observable 序列呢?为方便我们生成 Observable 序列, RxSwfit 的Observable类型提供了如下几个工厂方法:

  • just方法,用于生成只有一个事件的 Observable 序列;

  • of方法,生成包含多个事件的 Observable 序列;

  • from方法,和of方法一样,from方法也能生成包含多个事件的 Observable 序列,但它只接受数组为入口参数。

以下是相关代码示例。

swift
let observable1: Observable<Int> = Observable.just(1) // 序列包含 1
let observable2: Observable<Int> = Observable.of(1, 2, 3) // 序列包含 1, 2, 3 
let observable3: Observable<Int> = Observable.from([1, 2, 3]) // 序列包含 1, 2, 3
let observable4: Observable<[Int]> = Observable.of([1, 2, 3]) // 序列包含 [1, 2, 3]

当你需要生成只有一个事件的 Observable 序列时,可以使用just方法,如observable1只包含了1

当需要生成包含多个事件的 Observable 序列时,可以使用of或者from方法。它们的区别是,of接收多个参数而from只接收一个数组。如上所示,我们分别使用了offrom方法来生成observable2observable3,它们都包含了 1、2 和 3 三个事件。

这里需要注意,of方法也能接收数组作为参数的。与from方法会拆分数组为独立元素的做法不同,of方法只是把这个数组当成唯一的事件,例如observable4只包含值为[1, 2, 3]的一个事件。

在开发当中,Observable 序列不仅仅存放数值,比如 Moments App 的异步数据流就需要存放朋友圈信息来更新 UI,Observable也支持存放任意类型的数据。像在下面的例子中,peopleObservable就存放了两条类型为Person的数据,其中 Jake 的收入是 10 而 Ken 的收入是 20。

swift
struct Person {
    let name: String
    let income: Int
}
let peopleObservable = Observable.of(Person(name: "Jake", income: 10), Person(name: "Ken", income: 20))

订阅者

在响应式编程模式里,订阅者是一个重要的角色。在 Moments App 里面,上层模块都担任订阅者角色,主要订阅下层模块的 Observable 序列。那订阅者怎样才能订阅和接收数据呢?

在 RxSwift 中,订阅者可以调用Observable对象的subscribe方法来订阅。如下所示。

swift
let observable = Observable.of(1, 2, 3)
observable.subscribe { event in
    print(event)
}

订阅者调用subscribe方法订阅observable,并接收事件,当程序执行时会打印以下信息:

java
next(1)
next(2)
next(3)
completed

你可能会问上面的nextcompleted是什么呢?其实它们都是事件,用来表示异步数据流上的一条信息。RxSwift 使用了Event枚举来表示事件,定义如下。

swift
public enum Event<Element> {
    /// Next element is produced.
    case next(Element)
    /// Sequence terminated with an error.
    case error(Swift.Error)
    /// Sequence completed successfully.
    case completed
}
  • .next(value: T):用于装载数据的事件。当 Observable 序列发送数据时,订阅者会收到next事件,我们可以从该事件中取出实际的数据。

  • .error(error: Error):用于装载错误事件。当发生错误的时候,Observable 序列会发出error事件并关闭该序列,订阅者一旦收到error事件后就无法接收其他事件了。

  • .completed:用于正常关闭序列的事件。当 Observable 序列发出completed事件时就会关闭自己,订阅者在收到completed事件以后就无法收到任何其他事件了。

怎么理解呢?下面我通过两个例子来介绍下。由于之前讲过的offrom等方法都不能发出errorcompleted事件 ,在这里我就使用了create方法来创建 Observable 序列。

首先我们看一下发送error事件的例子。

swift
Observable<Int>.create { observer in
    observer.onNext(1)
    observer.onNext(2)
    observer.onError(MyError.anError)
    observer.onNext(3)
    return Disposables.create()
}.subscribe { event in
    print(event)
}

在这个例子中,我们调用了create方法来生成一个 Observable 序列,该 Observable 发出next(1)next(2)errornext(3)事件。由于next(3)事件在错误事件之后,因此订阅者无法接收到next(3)事件。程序执行时会打印下面的日志。

java
next(1)
next(2)
error(anError)

接着我们看一下发送completed事件的例子。

swift
Observable<Int>.create { observer in
    observer.onNext(1)
    observer.onCompleted()
    observer.onNext(2)
    observer.onNext(3)
    return Disposables.create()
}.subscribe { event in
    print(event)
}

在这里,我调用create方法来生成一个 Observable 序列,该 Observable 发出了next(1)completednext(2)next(3)事件。因为next(2)next(3)都在完成事件之后发出的,所以订阅者也无法接收它们,程序执行时会打印如下的日志。

java
next(1)
completed

在现实生活中,当我们订阅了报刊时可以自己选择退订,却无法让发行方停刊。在 RxSwift 里面也一样,订阅者无法强行让 Observable 序列发出completed事件来关闭数据流。那订阅者该怎样取消订阅呢?

如果你仔细观察就会发现,subscribe方法返回的类型为Disposable的对象,我们可以通过调用该对象的dispose方法来取消订阅。

为了更好地理解dispose方法的作用和触发时机,我通过subscribe()方法来打印出各个事件,如下所示。

swift
let disposable = Observable.of(1, 2).subscribe { element in
    print(element) // next event
} onError: { error in
    print(error)
} onCompleted: {
    print("Completed")
} onDisposed: {
    print("Disposed")
}
disposable.dispose()

我们在onNext闭包里面处理next事件;在onError闭包里处理error事件;在onCompleted闭包里处理completed事件;而在onDisposed闭包里处理退订事件。

在这里,我们调用subscribe方法后,它又马上调用了dispose方法,因此程序会在调用onCompleted之后立刻调用onDisposed。其执行效果如下:

java
1
2
Completed
Disposed

假如我在订阅前调用delay方法,那么所有的事件都会延时两秒钟后才通知订阅者,代码如下:

swift
let disposableWithDelay = Observable.of(1, 2).delay(.seconds(2), scheduler: MainScheduler.instance).subscribe { element in
    print(element) // next event
} onError: { error in
    print(error)
} onCompleted: {
    print("Completed")
} onDisposed: {
    print("Disposed")
}
disposableWithDelay.dispose()

和上面没有延时的例子一样,我们在调用subscribe方法以后马上调用了dispose方法,由于 Observable 序列上所有事件还在延时等待中,程序会直接调用onDisposed并退订了disposableWithDelay序列,因此没办法再收到两秒钟后所发出的next(1)next(2)completed事件了。 其执行效果如下:

java
Disposed

在很多时候,订阅后马上退订并不是我们想要的结果,我们希望订阅者一直监听事件直到自身消亡的时候才取消订阅。那有什么好的办法能做到这一点呢?

RxSwift 为我们提供了DisposeBag类型,方便存放和管理各个Disposable对象。其用法也非常简单,只需调用Disposabledisposed(by:)方法即可。代码如下:

swift
let disposeBag: DisposeBag = .init()
Observable.just(1).subscribe { event in
    print(event)
}.disposed(by: disposeBag)
Observable.of("a", "b").subscribe { event in
    print(event)
}.disposed(by: disposeBag)

代码中的disposeBag存放了两个Disposable对象。当订阅者调用其deinit方法时,同时也会调用disposeBagdeinit方法。在这时候,disposeBag会取出存放的所有Disposable对象,并调用它们的dispose方法来取消所有订阅。

在实际情况下,我建议只需为一个订阅者定义一个disposeBag即可。例如 Repository 模块同时订阅了 Networking 模块和 DataStore 模块,但它只定义了一个disposeBag来管理所有的订阅。

事件中转 Subject

以上是如何生成、订阅和退订 Observable 序列。使用Observable的工厂方法所生成的对象都是"只读",一旦生成,就无法添加新的事件。但很多时候,我们需要往 Observable 序列增加事件,比如要把用户点击 UI 的事件添加到 Observable 中,或者把底层模块的事件加工并添加到上层模块的序列中。

那么,有什么好办法能为异步数据序列添加新的事件呢?RxSwift 为我们提供的 Subject 及其onNext方法可以完成这项操作。

具体来说,Subject作为一种特殊的 Observable 序列,它既能接收又能发送,我们一般用它来做事件的中转。在 Moments App 的 MVVM 架构里面,我们就大量使用 Subject 发挥这一作用。 比如,当 Repository 模块从 Networking 模块中接收到事件时,会把该事件转送到自身的 Subject 来通知 ViewModel,从而保证 ViewModel 的状态同步。

那么,都有哪些常见的 Subject 呢?一般有 PublishSubject、BehaviorSubject 和 ReplaySubject。它们的区别在于订阅者能否收到订阅前的事件。

  • PublishSubject:如果你想订阅者只收到订阅后的事件,可以使用 PublishSubject。

  • BehaviorSubject:如果你想订阅者在订阅时能收到订阅前最后一条事件,可以使用 BehaviorSubject。

  • ReplaySubject :如果你想订阅者在订阅 能收到订阅前的 N 条事件,那么可以使用 ReplaySubject。

在订阅以后,它们的行为都是一致的,当 Subject 发出error或者completed事件以后,订阅者将无法接收到新的事件。与之相关的详细的内容,我会在第 19 讲数据层架构里展开介绍。

操作符

操作符(Operator)是 RxSwift 另外一个重要的概念,它能帮助订阅者在接收事件之前把 Observable 序列中的事件进行过滤、转换或者合并。

例如在 Moments App 里面,我们使用 map 操作符把 Model 数据转换成 ViewModel 类型来更新 UI。这里的 map 操作符就属于转换操作符,能帮助我们从一种数据类型转变成另外一种类型。除了map ,compactMap 和 flapMap 也属于转换操作符。

此外还有 filter 和 distinctUntilChanged等过滤操作符,我们可以使用过滤操作符把订阅者不关心的事件给过滤掉。还有合并操作符如 startWith,concat,merge,combineLatest 和 zip,可用于组装与合并多个 Observable 序列。

除了上面提到过的常用操作符,RxSwift 还为我们提供了 50 多个操作符,那怎样才能学会它们呢?我推荐你到 rxmarbles.com 或者到 App Store 下载 RxMarbles App,然后打开各个操作符并修改里面的参数,通过输入的事件和执行的结果来理解这些操作的作用。在之后的第 20 讲,我也会详细介绍一些常用的操作符的用法,到时候可以留意哦。

排程器

保持程序状态自动更新之所以困难,很大原因在于处理并发的异步事件是一件烦琐的事情。为了方便处理来自不同线程的并发异步事件,RxSwift 为我们提供了排程器。它可以帮我们把繁重的任务调度到后台排程器完成,并能指定其运行方式(如是串行还是并发),也能保证 UI 的任务都在主线程上执行。

比如在 Moments App 里面,Networking 和 DataStore 模块都在后台排程器上执行,而 View 模块都在主排程器上执行。

根据串行或者并发来归类,我们可以把排程器分成两大类串行的排程器和并发的排程器

串行的排程器包括 CurrentThreadScheduler、MainScheduler、SerialDispatchQueueScheduler。

其中,CurrentThreadScheduler 可以把任务安排在当前的线程上执行,这是默认的排程器。当我们不指定排程器的时候,RxSwift 都会使用 CurrentThreadScheduler 把任务放在当前线程里串行执行;MainScheduler 是把任务调度到主线程MainThread里并马上执行,它主要用于执行 UI 相关的任务;而SerialDispatchQueueScheduler 则会把任务放在dispatch_queue_t里面并串行执行。

并发的排程器包括 ConcurrentDispatchQueueScheduler 和 OperationQueueScheduler。

其中,ConcurrentDispatchQueueScheduler 把任务安排到dispatch_queue_t里面,且以并发的方式执行。该排程器一般用于执行后台任务,例如网络访问和数据缓存等等。在创建的时候,我们可以指定DispatchQueue的类型,例如使用ConcurrentDispatchQueueScheduler(qos: .background)来指定使用后台线程执行任务。

OperationQueueScheduler 是把任务放在NSOperationQueue里面,以并发的方式执行。这个排程器一般用于执行繁重的后台任务,并通过设置maxConcurrentOperationCount来控制所执行并发任务的最大数量。它可以用于下载大文件。

那么,如何用排程器进行调度,处理好不同线程的并发异步事件呢?请看下面的代码实现。

swift
Observable.of(1, 2, 3, 4)
    .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background))
    .dumpObservable()
    .map { "\(getThreadName()): \($0)" }
    .observeOn(MainScheduler.instance)
    .dumpObserver()
    .disposed(by: disposeBag)

首先我们传入ConcurrentDispatchQueueScheduler(qos: .background)来调用subscribeOn方法,把 Observable 序列发出事件的执行代码都调度到后台排程器去执行。然后通过传入MainScheduler.instance来调用observeOn,把订阅者执行的逻辑都调度主排程器去执行。

这是一种常用的模式,我们通常使用后台排程器来进行网络访问并处理返回数据,然后通过主排程器把数据呈现到 UI 中去。

由于后台线程不能保证执行的顺序,其执行效果如下,当你执行的时候可能会有点变化。

java
[Observable] 1 emitted on Unnamed Thread
[Observable] 2 emitted on Unnamed Thread
[Observer] Unnamed Thread: 1 received on Main Thread
[Observable] 3 emitted on Unnamed Thread
[Observer] Unnamed Thread: 2 received on Main Thread
[Observable] 4 emitted on Unnamed Thread
[Observer] Unnamed Thread: 3 received on Main Thread
[Observer] Unnamed Thread: 4 received on Main Thread

总结

在这一讲中我们介绍了 RxSwift 的五个关键概念:Observable 序列、订阅者、Subject、操作符以及排程器。我把本讲的代码都放在 Moments App 项目中的RxSwift Playground 文件里面,希望你能多练习,把五个概念融会贯通。

以下是我在实际工作中使用 RxSwift 的一些经验总结,希望能帮助到你。

  1. 当我们拿到需求的时候,先把任务进行分解,找出哪个部分是事件发布者 ,哪部分是事件订阅者,例如一个新功能页面,网络请求部分一般是事件发布者,当得到网络请求的返回结果时会发出事件,而 UI 部分一般为事件订阅者,通过订阅事件来保持 UI 的自动更新。

  2. 找到事件发布者以后,要分析事件发布的频率与间隔。如果只是发布一次,可以使用Obervable ;如果需要多次发布,可以使用Subject;如果需要缓存之前多个事件,可以使用 ReplaySubject。

  3. 当我们有了事件发布者和订阅者以后,接着可以分析发送和订阅事件的类型差异,选择合适的操作符来进行转换。我们可以先使用本讲中提到的常用操作符,如果它们还不能解决你的问题,可以查看 RxMarbles 来寻找合适的操作符。

  4. 最后,我们可以根据事件发布者和订阅者所执行的任务性质,通过排程器进行调度。例如把网络请求和数据缓存任务都安排在后台排程器,而 UI 更新任务放在主排程器。

我在后面几讲中会详细介绍如何把 RxSwift 应用到在 MVVM 架构来保证程序状态信息的自动更新。希望能帮助你把今天所学知识灵活应用到真实场景中。

思考题

据我所知,很多 iOS 开发者都想学习响应式编程和 RxSwift,但也不少人最终放弃了,如何你也曾经学习过并放弃了,请分享一下你的经验,哪一部分使你放弃学习和使用 RxSwift 呢?

可以把你的想法写得留言区哦,下一讲我将介绍如何设计网络访问与 JSON 数据解析。

源码地址:

RxSwift Playground 文件地址:https://github.com/lagoueduCol/iOS-linyongjian/blob/main/Playgrounds/RxSwiftPlayground.playground/Contents.swift