RxJava 教程第三部分:驯服数据流之自定义操作函数

作者: rain 分类: 移动 发布时间: 2016-04-25 22:57 6 条评论

RxJava 提供了很多操作函数。加上各种重载函数,一共有 300 多个操作函数。这些函数中只有很少一部分是核心的操作函数,离开这些核心的函数根本就没法使用 RxJava 了。其他的大部分函数只是一些便捷函数,方便开发者使用,并且他们的名字基本都说明了他们的用法。比如 如果操作函数 source.First(user -> user.isOnline()) 不存在,则我们依然可以使用 source.filter(user -> user.isOnline()).First() 来实现同样的功能。

尽管提供了 300 多个操作函数,但这些也都是很基本的操作。 Rx 提供了基础的功能,在此之上可以建构更加复杂的功能。最终你会遇到定义可重用代码的地方。 在 标准 Java 中重用代码是通过类和函数来实现,而在 Rx 中,是通过自定义操作函数来实现代码重用。例如,在您的程序中,计算数字流的平均数可能经常使用。但是 Observable 中并没有该函数,你可以自定义一个:

上面的代码实现了功能,但是没法重用。在标准的 Java 中可能会定义一个可以处理各种数据的函数,所以 一般的 Java 开发者可能一开始想到用一个函数来实现:

然后就可以重用了:

结果:

由于上面的代码很简单,所以看起来还不错。如果我们用自定义的操作函数做一些复杂的操作。例如,源 Observable 为一个句子,把这个句子分割没每个单词,并且把每个单词的长度作为数字的输入:

上面的代码可以正常使用,但是看起来不是纯 Rx 实现。如果每个 Rx 中的函数都是这样实现的,则最终多个操作函数一起使用就变成这样了:

这样我们在倒着处理数据流! (^o^)/~

把操作函数串联起来

Rx 中操作函数是通过串联调用的方式来使用的,而不是嵌套调用。这种用法在 Java 中也很常见,每个函数都返回该对象本身,这样就可以一直调用多个函数。例如 strings 对象:

通过这种方式,可以直观的看到对数据修改的顺序,如果用了多个操作函数看起来也更加简洁。

理想情况应该让你的自定义操作函数和标准的操作函数一样,可以串联的调用。

很多语言都直接支持该特性。但是 Java 并不直接支持。你不得不修改 Observable 的代码来添加你的操作函数。但是你没法告诉 RxJava 开发团队,让他们把你专用的操作函数给添加到 RxJava 标准类库中。虽然可以通过继承 Observable 的方式来添加你的操作函数,但是这样做也没法和标准的操作函数组合使用了。

compose

RxJava 提供了 compose 函数可以解决该问题。

一个 Transformer 接口。Transformer<T,R> 接口其实只是 Func1<Observable,Observable> 接口的另外一种简化形式。这是一个函数,把参数 Observable 转换为 Observable, 和我们计算平均数的实现是一样的:

在 Java 中没法直接引用函数的名字,上面示例中,我们假设自定义的操作函数在 Main 类中定义。这样自定义的操作函数就融合到串联调用中了,只不过需要先调用 compose 函数。通过在新的类中实现 Observable.Transformer 接口可以实现更好的封装:

然后可以这样使用:

大部分的 Rx 操作函数都是有参数的,我们也可以支持参数。比如:

这样我们就可以调用 source.compose(new RunningAverage(5)) 了。由于 Java 语言的限制,我们没法进一步优化这个使用情况了。这里有一个更加复杂的自定义操作函数的示例

lift

一般而言,Rx 操作函数都做三件事:
1. 订阅到源 Observable上并观察他们发生的数据
2. 根据操作函数的目的来转换数据流
3. 通过调用 onNext、 onError 和 onCompleted 函数 把转换后的数据发射给自己的订阅者。

compose 的参数为一个函数,该函数把一个 Observable 转换为另外一个 Observable。并且需要手工的完成上面3步操作。并且假设你可以使用已有的操作函数完成转换。如果没有对应的操作函数,则需要使用传统的面向对象的方式来处理。这样你需要从数据流中提取转换数据后重新发射出去。Observable.Transformer 通过订阅到源 Observable 上来实现这个功能。

自定义多个操作函数以后,你会发现,很多模板代码每次都需要编写,如果进入底层代码的话,有些模板代码可以省略。 lift 操作函数和 compose 类似, 区别是 转换的是一个 Subscriber 对象,而不是 Observable。

Observable.Operator<R,T> 是 Func1<Subscriber<? super R>,Subscriber<? super T>> 的变体, 是一个函数用来把一个 Subscriber 转换为 Subscriber。直接和 Subscriber 打交道可以避免访问 Observable。 lift 函数自动创建 Observable 并订阅。

如果你研究一下这个函数,可以发现好像这个函数是倒着声明的:为了把 Observable 转换为 Observable,需要一个函数把 Subscriber 转换为 Subscriber。 为什么会这样呢? 还记得一个订阅者在串联调用的末尾订阅的,然后传递给源 Observable。也就是说, Subscription 是倒着操作的。每个操作函数收到一个 Subscription,并使用这个 Subscription 来创建一个新的 Subscription 来处理这个操作。

下面的示例中,重新自定义实现 map 操作函数:

map 操作函数需要一个参数把 T 转换为 R。上面 的实现中,transformer 干了这件事。关键点在于 call 函数的调用。我们收到了一个 Subscriber对象,该对象需要一个 R 类型数据。我们为个 Subscriber 创建了一个Subscriber 对象,并把 T 转换为 R 类型数据然后发射给 Subscriber。 lift 操作函数处理接受 Subscriber 的模板代码,并且使用 Subscriber 订阅到源 Observable上。

使用 Observable.Operator 和使用 Observable.Transformer 一样简单:

结果:

Java 构造函数无法推倒类型,所以可以用一个静态函数来实现该功能:

然后这样使用:

就像实现 Observable.Operator 中手动把数据发射给 Subscriber 一样,需要考虑如下情况:
– Subscriber 可以随时取消订阅,所以需要检查是否还在订阅着,如果取消订阅了则不发射数据
– 你需要遵守 Rx 的约定,调用 onNext 发射数据,依 onCompleted 或者 onError 来结束数据流
– 如果需要异步处理数据或者调度,则需要使用 Rx 的 Schedulers 。这样你的操作函数将是可测试的。

serialize

如果你无法确保自定义的操作符符合 Rx 的约定,例如从多个源异步获取数据,则可以使用 serialize 操作函数。 serialize 可以把一个不符合约定的 Observable 转换为一个符合约定的 Observable。

image

下面创建一个不符合约定的 Observable,并且订阅到该 Observable上:

结果:

先不管上面的 Observable 发射的数据,订阅结束的情况看起来符合 Rx 约定。 这是由于 subscribe 认为当前数据流结束的时候会主动结束这个 Subscription。但这并不意味着总是这样的。 还有一个函数为 unsafeSubscribe ,该函数不会自动取消订阅。

结果:

上面的示例最后就没有打印 Unsubscribed 字符串。

unsafeSubscribe 也不能很好的处理错误情况。所以该函数几乎没用。在文档中说:该函数应该仅仅在自定义操作函数中处理嵌套订阅的情况。 为了避免这种操作函数接受到不合法的数据流,我们可以在其上应用 serialize 操作函数:

结果:

尽管上面的代码中没有调用unsubscribe, 但是数据流事件依然符合约定。最后也收到了完成事件。

lift 函数的额外好处

标准的操作函数也用 lift 实现的,如果你的自定义操作函数也通过 lift 实现,则 lift 在运行的时候就变成了一个 hot 函数, JVM 在运行的时候会优化该函数的调用,性能会有所提升。

在 lift 和 compose 之间做选择

lift 和 compose 都是元操作符(meta-operators),用来把自定义的操作函数注射到串联调用中。这两种情况下,自定义操作符既可以用函数实现也可以用类实现:
– compose: Observable.Transformer 或者 Func<Observable, Observable>
– lift: Observable.Operator 或者 Func<Subscriber, Subscriber>

理论上,每个操作函数都可以实现 Observable.Operator 和 Observable.Transformer。如果选择是根据使用的便捷性和你想避免什么样的模板代码:
– 如果自定义的操作函数只是现有的操作函数的组合,则使用 compose 比较自然
– 如果自定义从操作函数需要从数据流中获取数据,并做一些处理后再次发射数据到数据流,则使用 lift 比较好。

本文出自 云在千峰,转载时请注明出处及相应链接。

本文永久链接: http://blog.chengyunfeng.com/?p=976

Ɣ回顶部