【ReactiveX】Observable 对象(译)

title

Observable 对象

在 ReactiveX 中,一个观察者向 Observable 对象订阅消息。然后这个观察者将会响应 Observable 发射的消息任何对象或者对象队列。这个模式有利于并发操作,因为他不需要在等待 Observable 发射消息时候阻塞线程。而不是创建了一个观察者式的哨兵,随时准备着适时响应未来时间 Observable 对象发射消息。
这一页解释了什么是响应模式,什么是观察者和 Observable 对象(和观察者如何订阅 Observable 对象的)。其他页面解释了你应该如何使用各种 Observable 操作来将 Observable 对象关联上,然后改变他们的行为。
这个文档将解释集中展现为“marble diagrams”。下图就是代表 Observable 对象和 Observable 对象的转换过程:
12345
例如:

背景:

在很多软件程序任务中,你或多或少都会期望你编写的指令将会执行和完成的顺序是增量的,一个接一个的,按照你编写的顺序来执行。但是,在 ReactiveX 中,很多指令将会并行执行,他们的结果将会按照任意的顺序被延迟捕获。
不通过调用函数,你可以在 Observable 对象中定义一个装置来检索和转换数据,然后绑定一个观察者在其中,用来将一个之前定义的装置引发动作,观察者作为哨兵,当他们准备好,就可以捕获并且响应他们的释放。
这种方法的优势在于,,当你有多个任务,但是任务与任务之间并没有依赖关系,这样你就能够同步开始所有任务,而不是等待各自完成知乎在开始接下来的任务。如此,你的整个任务束的执行时间长度就和你最长的那个任务时间一样长。
有很多条款可以用来描述这种同步执行程序和设计。这份文档将会使用“一个观察者订阅了一个 Observable 对象”,一个 Observable 对象通过调用监听者的方法,来发射或者推送通知给它的监听者。
在其他文档和上线文中,我们叫做“观察者”的对象,有时候被叫做“订阅者 ”,“关注者”,或者“响应者”。这个模式总体上来说常常称作“响应者模式”

建立观察者

这个页面常规伪代码来举例,但是 ReactiveX 是在多种语言中实现的。
常规调用方式,并不是按照异步顺序,而是按照并行调用在 ReactiveX 中,流程如下:
1、调用一个方法。
2、用变量存储一个方法的返回值。
3、使用变量和它的新中去干活。
或者,像这样:
23456
在异步调用的模式中,流程更多的是像这样:
1、定义好一个方法,利用异步调用的返回值来做一些有用的事情,这个方法是观察的这一部分。
2、定义好一个异步调用作为一个 Observable 的对象。
3、通过订阅,将观察者绑定到 Observables 对象上去(这也初始化了 Observables 对象的行为)。
4、继续你的工作,直到有调用返回,Obsevables 对象发送数据,观察者的方法将会基于其数据进行相关操作。
像这样:
34567

继续、完成、错误

订阅者的方法能够让你将观察者连接上 Observable 对象。你的观察者实现如下函数中的一部分:
onNext:
当 Observable 对象发送数据的时候,该函数就会被调用。这个函数将数据作为一个参数携带。
onError:
Observable 对象产生想要的数据失败或者遇到其他错误的时候,就会调用该方法。这个函数将错误原因作为参数携带。
onCompleted:
如果没有遇到任何问题,当 Observable 对象调用完成 onNext 方法之后,最后就会调用 onCompleted 方法。
当连接 Observable 对象之后, onNext 将会被调用零次或者多次。然后接下来就会调用 onCompleted 或者 onError,但是并不会同时调用两者,这将会是最后的调用。我们约定,当调用 onNext 的时候,通常被称作 “emissions”,当调用 onCompleted 或者 onError 的时候,被称作 “notifications”。
更详细的订阅过程如下:
45678

取消订阅

在某些 ReactiveX 实现上,有一个特殊的接口—Subscriber,它实现了一个 unsubscribe 方法。你可以通过调用这个方法表明监听者不在关注该 Observable 对象发送的任何消息。这些 Observable 对象(如果他们没有其他观察者)就可以选择不在产生和发送新的数据。
这个 unsubscription 接口将会级联回调整个链上的操作,这些操作之前是观察者订阅在 Observable 对象上的。这也将会导致整个链上的所有连接都停止发送数据。这个操作并不能保证立即执行,然而,一个 Observable 对象也有可能在没有观察者订阅的情况下,仍旧产生和试图发送数据。

关于命名约定的提示

对于每个语言,ReactiveX 都有其特殊的实现方式。没有什么公用的命名规范,但是,这些实现方式都有一些共性。
此外,一些名称在其他上下文中有不同的意义,或者在其他特殊的语言实现中,看起来有些尴尬。
例如, onEvent 的命名模式(或者 onNext,onCompleted,onError)。在某些语境中,这表示事件已经被注册。但是在 ReactiveX 中,它代表事件本身。

“热订阅”与“冷订阅”

Observable 对象什么时候开始在他们的队列中发送数据?一个“热订阅”在产生数据的时候就会立刻把它发送出去,所以其他后续加入的订阅者将会从队列的中间接受到数据。一个“冷订阅”只会在一个订阅者订阅它之后,才会开始发送数据。所以,这类发送者能够保证看到完整的序列。
在某些 ReactiveX 的实现中,也存在一种“连接订阅”的 Observable 对象。这些对象只有在 Connect 函数1被调用的时候才会发送数据,无论被订阅没有。

订阅操作的组合

Observable 对象和 订阅者只是 ReactiveX 的开始。要想适用于一个事件队列而不是单一事件,最好能够将标准的订阅者模式进行轻微的改造才行。
真正的力量来自于“reactuve extension”(因此叫做 ReativeX)-操作,这些操作允许你转换、合并、操纵和控制Observable 对象发送的消息队列。
Rx 操作让你能够通过声明的方法组合异步队列,让你能够充分利用回调的便利性,同时,避免了通常和异步系统关联的嵌套回调的缺点。
这份文档将各种操作分组列下,对其使用方式举例:
Creating Observables
CreateDeferEmpty/Never/ThrowFromIntervalJustRangeRepeatStart, and Timer
Transforming Observable Items
BufferFlatMapGroupByMapScan, and Window
Filtering Observables
DebounceDistinctElementAtFilterFirstIgnoreElementsLastSampleSkipSkipLastTake, and TakeLast
Combining Observables
And/Then/WhenCombineLatestJoinMergeStartWithSwitch, and Zip
Error Handling Operators
Catch and Retry
Utility Operators
DelayDoMaterialize/DematerializeObserveOnSerializeSubscribeSubscribeOn,TimeIntervalTimeoutTimestamp, and Using
Conditional and Boolean Operators
AllAmbContainsDefaultIfEmptySequenceEqualSkipUntilSkipWhileTakeUntil, and TakeWhile
Mathematical and Aggregate Operators
AverageConcatCountMaxMinReduce, and Sum
Converting Observables
To
Connectable Observable Operators
ConnectPublishRefCount, and Replay
Backpressure Operators
a variety of operators that enforce particular flow-control policies
这些页面包含一些并不是 ReactiveX 核心功能,但是在一个或者多个语言或者可选模块中被实现的操作。

链式操作

Observable 对象的多数操作同样会返回一个 Observable 对象。允许你能够一个接一个的执行链式操作。每个操作都能够改变前一个操作返回的 Observable 对象。
其中也包括了其他模式,比如“Builder Pattern”,通过一个特殊的类上多个函数来操作数据。这些模式也让你能够将这些操作链式执行。但好似需要注意的是,执行顺序并不是按照这些操作的在链式操作上的编码先后顺序执行,而是按照 Observable 对象的操作顺序来执行。
链式操作并不是基于初始建链产生的 Observable 对象独立运行,而是都依照于链中紧邻的上一个操作产生的 Observable 对象运行。