RXSwift 如何选择操作符?

RXSwift 如何选择操作符?

Posted by WTJ on January 2, 2021

1、我想要创建一个 Observable

产生特定的一个元素:just

创建 Observable 发出唯一的一个元素

just

just 操作符将某一个元素转换为 Observable。

演示

一个序列只有唯一的元素 0:

let id = Observable.just(0)

它相当于:

let id = Observable<Int>.create { observer in
    observer.onNext(0)
    observer.onCompleted()
    return Disposables.create()
}

经过一段延时:timer

创建一个 Observable 在一段延时后,产生唯一的一个元素 timer

timer 操作符将创建一个 Observable,它在经过设定的一段时间后,产生唯一的一个元素。

这里存在其他版本的 timer 操作符。

timer1-2

创建一个 Observable 在一段延时后,每隔一段时间产生一个元素

public static func timer(
  _ dueTime: RxTimeInterval,  // 初始延时
  period: RxTimeInterval?,    // 时间间隔
  scheduler: SchedulerType
  ) -> Observable<E>

从一个序列拉取元素:from

将其他类型或者数据结构转换为 Observable

from

当你在使用 Observable 时,如果能够直接将其他类型转换为 Observable,这将是非常省事的。from 操作符就提供了这种功能。

演示:

将一个数组转换为 Observable:

let numbers = Observable.from([0, 1, 2])

它相当于:

let numbers = Observable<Int>.create { observer in
    observer.onNext(0)
    observer.onNext(1)
    observer.onNext(2)
    observer.onCompleted()
    return Disposables.create()
}

将一个可选值转换为 Observable:

let optional: Int? = 1
let value = Observable.from(optional: optional)

它相当于:

let optional: Int? = 1
let value = Observable<Int>.create { observer in
    if let element = optional {
        observer.onNext(element)
    }
    observer.onCompleted()
    return Disposables.create()
}

重复的产生某一个元素:repeatElement

创建重复发出某个元素的 Observable

repeatElement

repeatElement 操作符将创建一个 Observable,这个 Observable 将无止尽地发出同一个元素。

演示:

创建重复发出 0 的 Observable

let id = Observable.repeatElement(0)

它相当于:

let id = Observable<Int>.create { observer in
    observer.onNext(0)
    observer.onNext(0)
    observer.onNext(0)
    observer.onNext(0)
    ... // 无数次
    return Disposables.create()
}

存在自定义逻辑:create

通过一个构建函数完整的创建一个 Observable

create

create 操作符将创建一个 Observable,你需要提供一个构建函数,在构建函数里面描述事件(next,error,completed)的产生过程。

通常情况下一个有限的序列,只会调用一次观察者的 onCompleted 或者 onError 方法。并且在调用它们后,不会再去调用观察者的其他方法。

演示:

创建一个 [0, 1, … 8, 9] 的序列:

let id = Observable<Int>.create { observer in
    observer.onNext(0)
    observer.onNext(1)
    observer.onNext(2)
    observer.onNext(3)
    observer.onNext(4)
    observer.onNext(5)
    observer.onNext(6)
    observer.onNext(7)
    observer.onNext(8)
    observer.onNext(9)
    observer.onCompleted()
    return Disposables.create()
}

每次订阅时产生:deferred

直到订阅发生,才创建 Observable,并且为每位订阅者创建全新的 Observable

deferred

deferred 操作符将等待观察者订阅它,才创建一个 Observable,它会通过一个构建函数为每一位订阅者创建新的 Observable。看上去每位订阅者都是对同一个 Observable 产生订阅,实际上它们都获得了独立的序列。

在一些情况下,直到订阅时才创建 Observable 是可以保证拿到的数据都是最新的。

每隔一段时间,发出一个元素:interval

创建一个 Observable 每隔一段时间,发出一个索引数

interval

interval 操作符将创建一个 Observable,它每隔一段设定的时间,发出一个索引数的元素。它将发出无数个元素。

在一段延时后:timer

一个空序列,只有一个完成事件:empty

创建一个空 Observable

empty

empty 操作符将创建一个 Observable,这个 Observable 只有一个完成事件。

演示:

创建一个空 Observable:

let id = Observable<Int>.empty()

它相当于:

let id = Observable<Int>.create { observer in
    observer.onCompleted()
    return Disposables.create()
}

一个任何事件都没有产生的序列:never

创建一个永远不会发出元素的 Observable

never

never 操作符将创建一个 Observable,这个 Observable 不会产生任何事件。

演示:

创建一个不会产生任何事件的 Observable:

let id = Observable<Int>.never()

它相当于:

let id = Observable<Int>.create { observer in
    return Disposables.create()
}

2、我想要创建一个 Observable 通过组合其他的 Observables

任意一个 Observable 产生了元素,就发出这个元素:merge

将多个 Observables 合并成一个

merge

通过使用 merge 操作符你可以将多个 Observables 合并成一个,当某一个 Observable 发出一个元素时,他就将这个元素发出。

如果,某一个 Observable 发出一个 onError 事件,那么被合并的 Observable 也会将它发出,并且立即终止序列。

演示:

let disposeBag = DisposeBag()

let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()

Observable.of(subject1, subject2)
    .merge()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject1.onNext("🅰️")

subject1.onNext("🅱️")

subject2.onNext("①")

subject2.onNext("②")

subject1.onNext("🆎")

subject2.onNext("③")

输出结果:

🅰️
🅱️
①
②
🆎
③

让这些 Observables 一个接一个的发出元素,当上一个 Observable 元素发送完毕后,下一个 Observable 才能开始发出元素:concat

让两个或多个 Observables 按顺序串连起来

concat

concat 操作符将多个 Observables 按顺序串联起来,当前一个 Observable 元素发送完毕后,后一个 Observable 才可以开始发出元素。

concat 将等待前一个 Observable 产生完成事件后,才对后一个 Observable 进行订阅。如果后一个是“热” Observable ,在它前一个 Observable 产生完成事件前,所产生的元素将不会被发送出来。

startWith 和它十分相似。但是 startWith 不是在后面添加元素,而是在前面插入元素。

merge 和它也是十分相似。merge 并不是将多个 Observables 按顺序串联起来,而是将他们合并到一起,不需要 Observables 按先后顺序发出元素。

演示:

let disposeBag = DisposeBag()

let subject1 = BehaviorSubject(value: "🍎")
let subject2 = BehaviorSubject(value: "🐶")

let subject = BehaviorSubject(value: subject1)

subject
    .asObservable()
    .concat()
    .subscribe { print($0) }
    .disposed(by: disposeBag)

subject1.onNext("🍐")
subject1.onNext("🍊")

subject.onNext(subject2)

subject2.onNext("I would be ignored")
subject2.onNext("🐱")

subject1.onCompleted()

subject2.onNext("🐭")

输出结果:

next(🍎)
next(🍐)
next(🍊)
next(🐱)
next(🐭)

组合多个 Observables 的元素

当每一个 Observable 都发出一个新的元素:zip

通过一个函数将多个 Observables 的元素组合起来,然后将每一个组合的结果发出来

zip

zip 操作符将多个(最多不超过8个) Observables 的元素通过一个函数组合起来,然后将这个组合的结果发出来。它会严格的按照序列的索引数进行组合。例如,返回的 Observable 的第一个元素,是由每一个源 Observables 的第一个元素组合出来的。它的第二个元素 ,是由每一个源 Observables 的第二个元素组合出来的。它的第三个元素 ,是由每一个源 Observables 的第三个元素组合出来的,以此类推。它的元素数量等于源 Observables 中元素数量最少的那个。

演示:

let disposeBag = DisposeBag()
let first = PublishSubject<String>()
let second = PublishSubject<String>()

Observable.zip(first, second) { $0 + $1 }
          .subscribe(onNext: { print($0) })
          .disposed(by: disposeBag)

first.onNext("1")
second.onNext("A")
first.onNext("2")
second.onNext("B")
second.onNext("C")
second.onNext("D")
first.onNext("3")
first.onNext("4")

输出结果:

1A
2B
3C
4D
当任意一个 Observable 发出一个新的元素:combineLatest

当多个 Observables 中任何一个发出一个元素,就发出一个元素。这个元素是由这些 Observables 中最新的元素,通过一个函数组合起来的

combineLatest

combineLatest 操作符将多个 Observables 中最新的元素通过一个函数组合起来,然后将这个组合的结果发出来。这些源 Observables 中任何一个发出一个元素,他都会发出一个元素(前提是,这些 Observables 曾经都发出过元素)。

演示:

tips: 可与 zip 比较学习

let disposeBag = DisposeBag()

let first = PublishSubject<String>()
let second = PublishSubject<String>()

Observable.combineLatest(first, second) { $0 + $1 }
          .subscribe(onNext: { print($0) })
          .disposed(by: disposeBag)

first.onNext("1")
second.onNext("A")
first.onNext("2")
second.onNext("B")
second.onNext("C")
second.onNext("D")
first.onNext("3")
first.onNext("4")

输出结果:

1A
2A
2B
2C
2D
3D
4D

3、我想要转换 Observable 的元素后,再将它们发出来

对每个元素直接转换:map

通过一个转换函数,将 Observable 的每个元素转换一遍

map

map 操作符将源 Observable 的每个元素应用你提供的转换方法,然后返回含有转换结果的 Observable。

演示:

let disposeBag = DisposeBag()
Observable.of(1, 2, 3)
    .map { $0 * 10 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

输出结果:

10
20
30

转换到另一个 Observable:flatMap

将 Observable 的元素转换成其他的 Observable,然后将这些 Observables 合并

flatMap

flatMap 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。 然后将这些 Observables 的元素合并之后再发送出来。

这个操作符是非常有用的,例如,当 Observable 的元素本身拥有其他的 Observable 时,你可以将所有子 Observables 的元素发送出来。

演示:

let disposeBag = DisposeBag()
let first = BehaviorSubject(value: "👦🏻")
let second = BehaviorSubject(value: "🅰️")
let subject = BehaviorSubject(value: first)

subject.asObservable()
        .flatMap { $0 }
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)

first.onNext("🐱")
subject.onNext(second)
second.onNext("🅱️")
first.onNext("🐶")

输出结果:

👦🏻
🐱
🅰️
🅱️
🐶

只接收最新的元素转换的 Observable 所产生的元素:flatMapLatest

将 Observable 的元素转换成其他的 Observable,然后取这些 Observables 中最新的一个

flatMapLatest

flatMapLatest 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。一旦转换出一个新的 Observable,就只发出它的元素,旧的 Observables 的元素将被忽略掉。

演示:

tips:与 flatMap 比较更容易理解

let disposeBag = DisposeBag()
let first = BehaviorSubject(value: "👦🏻")
let second = BehaviorSubject(value: "🅰️")
let subject = BehaviorSubject(value: first)

subject.asObservable()
        .flatMapLatest { $0 }
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)

first.onNext("🐱")
subject.onNext(second)
second.onNext("🅱️")
first.onNext("🐶")

输出结果:

👦🏻
🐱
🅰️
🅱️

每一个元素转换的 Observable 按顺序产生元素:concatMap

将 Observable 的元素转换成其他的 Observable,然后将这些 Observables 串连起来

concatMap

concatMap 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。然后让这些 Observables 按顺序的发出元素,当前一个 Observable 元素发送完毕后,后一个 Observable 才可以开始发出元素。等待前一个 Observable 产生完成事件后,才对后一个 Observable 进行订阅。

演示:

let disposeBag = DisposeBag()

let subject1 = BehaviorSubject(value: "🍎")
let subject2 = BehaviorSubject(value: "🐶")

let subject = BehaviorSubject(value: subject1)

subject.asObservable()
        .concatMap { $0 }
        .subscribe { print($0) }
        .disposed(by: disposeBag)

subject1.onNext("🍐")
subject1.onNext("🍊")

subject.onNext(subject2)

subject2.onNext("I would be ignored")
subject2.onNext("🐱")

subject1.onCompleted()

subject2.onNext("🐭")

输出结果:

next(🍎)
next(🍐)
next(🍊)
next(🐱)
next(🐭)

基于所有遍历过的元素: scan

持续的将 Observable 的每一个元素应用一个函数,然后发出每一次函数返回的结果

scan 操作符将对第一个元素应用一个函数,将结果作为第一个元素发出。然后,将结果作为参数填入到第二个元素的应用函数中,创建第二个元素。以此类推,直到遍历完全部的元素。

这种操作符在其他地方有时候被称作是 accumulator。

演示:

let disposeBag = DisposeBag()

Observable.of(10, 100, 1000)
    .scan(1) { aggregateValue, newValue in
        aggregateValue + newValue
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

输出结果:

11
111
1111

4、我想要将产生的每一个元素,拖延一段时间后再发出:delay

将 Observable 的每一个元素拖延一段时间后发出

delay

delay 操作符将修改一个 Observable,它会将 Observable 的所有元素都拖延一段设定好的时间, 然后才将它们发送出来。

5、我想要将产生的事件封装成元素发送出来

将他们封装成 Event:[materialize](https://beeth0ven.github.io/RxSwift-Chinese-Documentation/content/decision_tree/materialize.html)

将序列产生的事件,转换成元素

materialize

通常,一个有限的 Observable 将产生零个或者多个 onNext 事件,然后产生一个 onCompleted 或者 onError 事件。

materialize 操作符将 Observable 产生的这些事件全部转换成元素,然后发送出来。

然后解封出来:dematerialize

dematerialize 操作符将 materialize 转换后的元素还原

dematerialize

6、我想要忽略掉所有的 next 事件,只接收 completed 和 error 事件:ignoreElements

忽略掉所有的元素,只发出 error 或 completed 事件

ignoreElements

ignoreElements 操作符将阻止 Observable 发出 next 事件,但是允许他发出 error 或 completed 事件。

如果你并不关心 Observable 的任何元素,你只想知道 Observable 在什么时候终止,那就可以使用 ignoreElements 操作符。

7、我想创建一个新的 Observable 在原有的序列前面加入一些元素:startWith

将一些元素插入到序列的头部

startWith

startWith 操作符会在 Observable 头部插入一些元素。

(如果你想在尾部加入一些元素可以用concat

演示:

let disposeBag = DisposeBag()

Observable.of("🐶", "🐱", "🐭", "🐹")
    .startWith("1")
    .startWith("2")
    .startWith("3", "🅰️", "🅱️")
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

输出结果:

3
🅰️
🅱️
2
1
🐶
🐱
🐭
🐹

8、我想从 Observable 中收集元素,缓存这些元素之后在发出:buffer

缓存元素,然后将缓存的元素集合,周期性的发出来

buffer

buffer 操作符将缓存 Observable 中发出的新元素,当元素达到某个数量,或者经过了特定的时间,它就会将这个元素集合发送出来。

9、我想将 Observable 拆分成多个 Observables:window

将 Observable 分解为多个子 Observable,周期性的将子 Observable 发出来

window

window 操作符和 buffer 十分相似,buffer 周期性的将缓存的元素集合发送出来,而 window 周期性的将元素集合以 Observable 的形态发送出来。

buffer 要等到元素搜集完毕后,才会发出元素序列。而 window 可以实时发出元素序列。

基于元素的共同特征:groupBy

将源 Observable 分解为多个子 Observable,并且每个子 Observable 将源 Observable 中“相似”的元素发送出来 groupBy

groupBy 操作符将源 Observable 分解为多个子 Observable,然后将这些子 Observable 发送出来。

它会将元素通过某个键进行分组,然后将分组后的元素序列以 Observable 的形态发送出来。

10、我想只接收Observable 中特定的元素

发出唯一的元素:single

限制 Observable 只有一个元素,否出发出一个 error 事件 single

single 操作符将限制 Observable 只产生一个元素。如果 Observable 只有一个元素,它将镜像这个 Observable 。如果 Observable 没有元素或者元素数量大于一,它将产生一个 error 事件。

11、我想重新从 Observable 中发出某些元素

通过判定条件过滤出一些元素:filter

仅仅发出 Observable 中通过判定的元素

filter

filter 操作符将通过你提供的判定方法过滤一个 Observable。

演示:

let disposeBag = DisposeBag()

Observable.of(2, 30, 22, 5, 60, 1)
          .filter { $0 > 10 }
          .subscribe(onNext: { print($0) })
          .disposed(by: disposeBag)

输出结果:

30
22
60

仅仅发出头几个元素:take

仅仅从 Observable 中发出头 n 个元素 take

通过 take 操作符你可以只发出头 n 个元素。并且忽略掉后面的元素,直接结束序列。

演示:

let disposeBag = DisposeBag()

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .take(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

输出结果:

🐱
🐰
🐶

仅仅发出尾部的几个元素:takeLast

仅仅从 Observable 中发出尾部 n 个元素

takeLast

通过 takeLast 操作符你可以只发出尾部 n 个元素。并且忽略掉前面的元素。

演示:

let disposeBag = DisposeBag()

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .takeLast(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

输出结果:

🐸
🐷
🐵

仅仅发出第 n 个元素:elementAt

只发出 Observable 中的第 n 个元素

elementAt

elementAt 操作符将拉取 Observable 序列中指定索引数的元素,然后将它作为唯一的元素发出。

演示:

let disposeBag = DisposeBag()

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .elementAt(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

输出结果:

🐸

跳过头几个元素

跳过头 n 个元素:skip

跳过 Observable 中头 n 个元素

skip

skip 操作符可以让你跳过 Observable 中头 n 个元素,只关注后面的元素。

演示:

let disposeBag = DisposeBag()

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .skip(2)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

输出结果:

🐶
🐸
🐷
🐵

跳过头几个满足判定的元素:skipWhile

skipWhileWithIndex

跳过 Observable 中头几个元素,直到元素的判定为否 skipWhile

skipWhile 操作符可以让你忽略源 Observable 中头几个元素,直到元素的判定为否后,它才镜像源 Observable。

演示:

let disposeBag = DisposeBag()

Observable.of(1, 2, 3, 4, 3, 2, 1)
    .skipWhile { $0 < 4 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

输出结果:

4
3
2
1
跳过某段时间内产生的头几个元素:skip
跳过头几个元素直到另一个 Observable 发出一个元素:skipUntil

忽略掉在第二个 Observable 产生事件后发出的那部分元素

takeUntil

takeUntil 操作符将镜像源 Observable,它同时观测第二个 Observable。一旦第二个 Observable 发出一个元素或者产生一个终止事件,那个镜像的 Observable 将立即终止。

演示:

let disposeBag = DisposeBag()

let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()

sourceSequence
    .takeUntil(referenceSequence)
    .subscribe { print($0) }
    .disposed(by: disposeBag)

sourceSequence.onNext("🐱")
sourceSequence.onNext("🐰")
sourceSequence.onNext("🐶")

referenceSequence.onNext("🔴")

sourceSequence.onNext("🐸")
sourceSequence.onNext("🐷")
sourceSequence.onNext("🐵")

输出结果:

next(🐱)
next(🐰)
next(🐶)
completed

只取头几个元素

只取头几个满足判定的元素:takeWhile

takeWhileWithIndex

镜像一个 Observable 直到某个元素的判定为 false

takeWhile

takeWhile 操作符将镜像源 Observable 直到某个元素的判定为 false。此时,这个镜像的 Observable 将立即终止。

演示:

et disposeBag = DisposeBag()

Observable.of(1, 2, 3, 4, 3, 2, 1)
    .takeWhile { $0 < 4 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

输出结果:

1
2
3

只取某段时间内产生的头几个元素:take

仅仅从 Observable 中发出头 n 个元素

take

通过 take 操作符你可以只发出头 n 个元素。并且忽略掉后面的元素,直接结束序列。

演示:

let disposeBag = DisposeBag()

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .take(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

输出结果:

🐱
🐰
🐶

只取头几个元素直到另一个 Observable 发出一个元素:takeUntil

忽略掉在第二个 Observable 产生事件后发出的那部分元素

takeUntil

takeUntil 操作符将镜像源 Observable,它同时观测第二个 Observable。一旦第二个 Observable 发出一个元素或者产生一个终止事件,那个镜像的 Observable 将立即终止。

演示:

let disposeBag = DisposeBag()

let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()

sourceSequence
    .takeUntil(referenceSequence)
    .subscribe { print($0) }
    .disposed(by: disposeBag)

sourceSequence.onNext("🐱")
sourceSequence.onNext("🐰")
sourceSequence.onNext("🐶")

referenceSequence.onNext("🔴")

sourceSequence.onNext("🐸")
sourceSequence.onNext("🐷")
sourceSequence.onNext("🐵")

输出结果:

next(🐱)
next(🐰)
next(🐶)
completed

周期性的对 Observable 抽样:sample

不定期的对 Observable 取样

sample

sample 操作符将不定期的对源 Observable 进行取样操作。通过第二个 Observable 来控制取样时机。一旦第二个 Observable 发出一个元素,就从源 Observable 中取出最后产生的元素。

发出那些元素,这些元素产生后的特定的时间内,没有新的元素产生:debounce

过滤掉高频产生的元素

debounce

debounce 操作符将发出这种元素,在 Observable 产生这种元素后,一段时间内没有新元素产生。

直到元素的值发生变化,才发出新的元素:distinctUntilChanged

并提供元素是否相等的判定函数:distinctUntilChanged

阻止 Observable 发出相同的元素 distinctUntilChanged

distinctUntilChanged 操作符将阻止 Observable 发出相同的元素。如果后一个元素和前一个元素是相同的,那么这个元素将不会被发出来。如果后一个元素和前一个元素不相同,那么这个元素才会被发出来。

演示:

let disposeBag = DisposeBag()

Observable.of("🐱", "🐷", "🐱", "🐱", "🐱", "🐵", "🐱")
    .distinctUntilChanged()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

输出结果:

🐱
🐷
🐱
🐵
🐱

在开始发出元素时,延时后进行订阅:delaySubscription

进行延时订阅

delaySubscription

delaySubscription 操作符将在经过所设定的时间后,才对 Observable 进行订阅操作。

12、我想要从一些 Observables 中,只取第一个产生元素的 Observable:amb

在多个源 Observables 中, 取第一个发出元素或产生事件的 Observable,然后只发出它的元素

amb

当你传入多个 Observables 到 amb 操作符时,它将取其中一个 Observable:第一个产生事件的那个 Observable,可以是一个 next,error 或者 completed 事件。 amb 将忽略掉其他的 Observables。

13、我想评估 Observable 的全部元素

并且对每个元素应用聚合方法,待所有元素都应用聚合方法后,发出结果:reduce

持续的将 Observable 的每一个元素应用一个函数,然后发出最终结果

reduce

reduce 操作符将对第一个元素应用一个函数。然后,将结果作为参数填入到第二个元素的应用函数中。以此类推,直到遍历完全部的元素后发出最终结果。

这种操作符在其他地方有时候被称作是 accumulator,aggregate,compress,fold 或者 inject。

演示:

let disposeBag = DisposeBag()

Observable.of(10, 100, 1000)
    .reduce(1, accumulator: +)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

输出结果:

1111

并且对每个元素应用聚合方法,每次应用聚合方法后,发出结果:scan

14、我想把 Observable 转换为其他的数据结构:as…

15、我想在某个 Scheduler 应用操作符:subscribeOn

在某个 Scheduler 监听:observeOn

指定 Observable 在那个 Scheduler 发出通知

observeOn

ReactiveX 使用 Scheduler 来让 Observable 支持多线程。你可以使用 observeOn 操作符,来指示 Observable 在哪个 Scheduler 发出通知。

observeOn1

注意⚠️:一旦产生了 onError 事件, observeOn 操作符将立即转发。他不会等待 onError 之前的事件全部被收到。这意味着 onError 事件可能会跳过一些元素提前发送出去,如上图所示。

schedulers

subscribeOn 操作符非常相似。它指示 Observable 在哪个 Scheduler 发出执行。

默认情况下,Observable 创建,应用操作符以及发出通知都会在 Subscribe 方法调用的 Scheduler 执行。subscribeOn 操作符将改变这种行为,它会指定一个不同的 Scheduler 来让 Observable 执行,observeOn 操作符将指定一个不同的 Scheduler 来让 Observable 通知观察者。

如上图所示,subscribeOn 操作符指定 Observable 在那个 Scheduler 开始执行,无论它处于链的那个位置。 另一方面 observeOn 将决定后面的方法在哪个 Scheduler 运行。因此,你可能会多次调用 observeOn 来决定某些操作符在哪个线程运行。

16、我想要 Observable 发生某个事件时, 采取某个行动:do

当 Observable 产生某些事件时,执行某个操作

do

当 Observable 的某些事件产生时,你可以使用 do 操作符来注册一些回调操作。这些回调会被单独调用,它们会和 Observable 原本的回调分离。

17、我想要 Observable 发出一个 error 事件:error

创建一个只有 error 事件的 Observable error

error 操作符将创建一个 Observable,这个 Observable 只会产生一个 error 事件。

演示:

创建一个只有 error 事件的 Observable:

let error: Error = ...
let id = Observable<Int>.error(error)

它相当于:

let error: Error = ...
let id = Observable<Int>.create { observer in
    observer.onError(error)
    return Disposables.create()
}

如果规定时间内没有产生元素:timeout

如果源 Observable 在规定时间内没有发出任何元素,就产生一个超时的 error 事件

timeout

如果 Observable 在一段时间内没有产生元素,timeout 操作符将使它发出一个 error 事件。

17、我想要 Observable 发生错误时,优雅的恢复

如果规定时间内没有产生元素,就切换到备选 Observable :timeout

如果产生错误,将错误替换成某个元素 :catchErrorJustReturn

catchErrorJustReturn 操作符会将error 事件替换成其他的一个元素,然后结束该序列。

演示:

let disposeBag = DisposeBag()
let sequenceThatFails = PublishSubject<String>()

sequenceThatFails
    .catchErrorJustReturn("😊")
    .subscribe { print($0) }
    .disposed(by: disposeBag)

sequenceThatFails.onNext("😬")
sequenceThatFails.onNext("😨")
sequenceThatFails.onNext("😡")
sequenceThatFails.onNext("🔴")
sequenceThatFails.onError(TestError.test)

输出结果:

next(😬)
next(😨)
next(😡)
next(🔴)
next(😊)
completed

如果产生错误,就切换到备选 Observable :catchError

从一个错误事件中恢复,将错误事件替换成一个备选序列

catchError

catchError 操作符将会拦截一个 error 事件,将它替换成其他的元素或者一组元素,然后传递给观察者。这样可以使得 Observable 正常结束,或者根本都不需要结束。

这里存在其他版本的 catchError 操作符。

演示:

let disposeBag = DisposeBag()

let sequenceThatFails = PublishSubject<String>()
let recoverySequence = PublishSubject<String>()

sequenceThatFails
    .catchError {
        print("Error:", $0)
        return recoverySequence
    }
    .subscribe { print($0) }
    .disposed(by: disposeBag)

sequenceThatFails.onNext("😬")
sequenceThatFails.onNext("😨")
sequenceThatFails.onNext("😡")
sequenceThatFails.onNext("🔴")
sequenceThatFails.onError(TestError.test)

recoverySequence.onNext("😊")

输出结果:

next(😬)
next(😨)
next(😡)
next(🔴)
Error: test
next(😊)

如果产生错误,就重试 :retry

如果源 Observable 产生一个错误事件,重新对它进行订阅,希望它不会再次产生错误

retry

retry 操作符将不会将 error 事件,传递给观察者,然而,它会从新订阅源 Observable,给这个 Observable 一个重试的机会,让它有机会不产生 error 事件。retry 总是对观察者发出 next 事件,即便源序列产生了一个 error 事件,所以这样可能会产生重复的元素(如上图所示)。

演示 1:

let disposeBag = DisposeBag()
var count = 1

let sequenceThatErrors = Observable<String>.create { observer in
    observer.onNext("🍎")
    observer.onNext("🍐")
    observer.onNext("🍊")

    if count == 1 {
        observer.onError(TestError.test)
        print("Error encountered")
        count += 1
    }

    observer.onNext("🐶")
    observer.onNext("🐱")
    observer.onNext("🐭")
    observer.onCompleted()

    return Disposables.create()
}

sequenceThatErrors
    .retry()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

输出结果:

🍎
🍐
🍊
Error encountered
🍎
🍐
🍊
🐶
🐱
🐭

演示 2

let disposeBag = DisposeBag()
var count = 1

let sequenceThatErrors = Observable<String>.create { observer in
    observer.onNext("🍎")
    observer.onNext("🍐")
    observer.onNext("🍊")

    if count < 5 {
        observer.onError(TestError.test)
        print("Error encountered")
        count += 1
    }

    observer.onNext("🐶")
    observer.onNext("🐱")
    observer.onNext("🐭")
    observer.onCompleted()

    return Disposables.create()
}

sequenceThatErrors
    .retry(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

输出结果:

🍎
🍐
🍊
Error encountered
🍎
🍐
🍊
Error encountered
🍎
🍐
🍊
Error encountered
Unhandled error happened: test
 subscription called from:

18、我创建一个 Disposable 资源,使它与 Observable 具有相同的寿命:using

创建一个可被清除的资源,它和 Observable 具有相同的寿命

using

通过使用 using 操作符创建 Observable 时,同时创建一个可被清除的资源,一旦 Observable 终止了,那么这个资源就会被清除掉了。

19、我创建一个 Observable,直到我通知它可以产生元素后,才能产生元素:publish

将 Observable 转换为可被连接的 Observable

publish

publish 会将 Observable 转换为可被连接的 Observable。可被连接的 Observable 和普通的 Observable 十分相似,不过在被订阅后不会发出元素,直到 connect 操作符被应用为止。这样一来你可以控制 Observable 在什么时候开始发出元素。

演示:

let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .publish()

_ = intSequence
    .subscribe(onNext: { print("Subscription 1:, Event: \($0)") })

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    _ = intSequence.connect()
}

DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
  _ = intSequence
      .subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}

DispatchQueue.main.asyncAfter(deadline: .now() + 6) {
  _ = intSequence
      .subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}

输出结果:

Subscription 1:, Event: 0
Subscription 1:, Event: 1
Subscription 2:, Event: 1
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subscription 1:, Event: 3
Subscription 2:, Event: 3
Subscription 3:, Event: 3
Subscription 1:, Event: 4
Subscription 2:, Event: 4
Subscription 3:, Event: 4
Subscription 1:, Event: 5
Subscription 2:, Event: 5
Subscription 3:, Event: 5
Subscription 1:, Event: 6
Subscription 2:, Event: 6
Subscription 3:, Event: 6
...

并且,就算是在产生元素后订阅,也要发出全部元素:replay

确保观察者接收到同样的序列,即使是在 Observable 发出元素后才订阅

replay

可被连接的 Observable 和普通的 Observable 十分相似,不过在被订阅后不会发出元素,直到 connect 操作符被应用为止。这样一来你可以控制 Observable 在什么时候开始发出元素。

replay 操作符将 Observable 转换为可被连接的 Observable,并且这个可被连接的 Observable 将缓存最新的 n 个元素。当有新的观察者对它进行订阅时,它就把这些被缓存的元素发送给观察者。

演示:

let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .replay(5)

_ = intSequence
    .subscribe(onNext: { print("Subscription 1:, Event: \($0)") })

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    _ = intSequence.connect()
}

DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
  _ = intSequence
      .subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}

DispatchQueue.main.asyncAfter(deadline: .now() + 8) {
  _ = intSequence
      .subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}

输出结果:

Subscription 1:, Event: 0
Subscription 2:, Event: 0
Subscription 1:, Event: 1
Subscription 2:, Event: 1
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subscription 1:, Event: 3
Subscription 2:, Event: 3
Subscription 1:, Event: 4
Subscription 2:, Event: 4
Subscription 3:, Event: 0
Subscription 3:, Event: 1
Subscription 3:, Event: 2
Subscription 3:, Event: 3
Subscription 3:, Event: 4
Subscription 1:, Event: 5
Subscription 2:, Event: 5
Subscription 3:, Event: 5
Subscription 1:, Event: 6
Subscription 2:, Event: 6
Subscription 3:, Event: 6
...

并且,一旦所有观察者取消观察,他就被释放掉:refCount

将可被连接的 Observable 转换为普通 Observable

refCount

可被连接的 Observable 和普通的 Observable 十分相似,不过在被订阅后不会发出元素,直到 connect 操作符被应用为止。这样一来你可以控制 Observable 在什么时候开始发出元素。

refCount 操作符将自动连接和断开可被连接的 Observable。它将可被连接的 Observable 转换为普通 Observable。当第一个观察者对它订阅时,那么底层的 Observable 将被连接。当最后一个观察者离开时,那么底层的 Observable 将被断开连接。

通知它可以产生元素了:connect

通知 ConnectableObservable 可以开始发出元素了

publish

ConnectableObservable 和普通的 Observable 十分相似,不过在被订阅后不会发出元素,直到 connect 操作符被应用为止。这样一来你可以等所有观察者全部订阅完成后,才发出元素。

演示:

let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .publish()

_ = intSequence
    .subscribe(onNext: { print("Subscription 1:, Event: \($0)") })

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    _ = intSequence.connect()
}

DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
  _ = intSequence
      .subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}

DispatchQueue.main.asyncAfter(deadline: .now() + 6) {
  _ = intSequence
      .subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}

输出结果:

Subscription 1:, Event: 0
Subscription 1:, Event: 1
Subscription 2:, Event: 1
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subscription 1:, Event: 3
Subscription 2:, Event: 3
Subscription 3:, Event: 3
Subscription 1:, Event: 4
Subscription 2:, Event: 4
Subscription 3:, Event: 4
Subscription 1:, Event: 5
Subscription 2:, Event: 5
Subscription 3:, Event: 5
Subscription 1:, Event: 6
Subscription 2:, Event: 6
Subscription 3:, Event: 6
...

参考:

https://beeth0ven.github.io/RxSwift-Chinese-Documentation/content/decision_tree.html