把异步回调操作转换到 RxJava 中

作者: rain 分类: 移动 发布时间: 2016-09-22 00:41 6 条评论

Yammer 应用开发团队介绍了如何使用 RxJava v1.1.7 版本的 Observable.fromAsync() 函数来把异步回调操作数据发射到 RxJava 数据流中。

现有的 API 通常有同步阻塞 API 和异步非阻塞 API。通过 Observable.fromCallable() 函数可以把同步 API 封装为 Observable,

上面的示例中,使用 Observable.fromCallable 把针对 SharedPreferences 的操作封装为 Observable。

而在 Android 中还有很多异步回调场景,在网络上到处可以看到使用 Observable.create() 来封装异步调用的示例,比如 把现有的库封装为 Observable,在RxJava中封装异步 API,还有这里。但是,使用 Observable.create() 有很多缺点,后面会介绍这些缺点。

下面使用一个示例来看看如何封装异步 API。假设我们用 SensorManager 来追踪设备的加速度。 普通的实现方式是这样的:

下面是一种天真的封装为 RxJava Observable 的方式:

由于加速度感应器为 hot Observable,所以上面的封装,没有调用 subscriber.onCompleted() 函数,只要 Subscriber 没有 unsubscribed,则一直有数据产生。

然后使用上面封装的代码:

虽然上面的代码可以工作。但是距离一个符合 Observable 规范的实现来差很多, Observable 规范有如下几点:
1. 如果 Subscriber 取消注册了,则需要取消注册加速度的监听器来避免内存泄露
2. 需要捕获可能出现的异常,然后把异常传递到 Observable 的 onError() 函数来避免导致程序崩溃
3. 在调用 onNext() 或者 onError() 之前,需要判断 Subscriber 是否还在监听事件,避免发射不必要的数据
4. 需要处理 backpressure(数据发射的速度比 Subscriber 处理的速度要快的情况),防止 MissingBackpressureException 异常。

上面的 1~3 步,使用 Observable.create() 函数还是可以正确实现的,虽然麻烦一点:

而实现 4 就没有这么简单了,毕竟 RxJava 中的 Backpressure 都可以出一本书来详细介绍其内容了。每次封装异步 API 都需要手工实现 Backpressure 则是非常痛苦的,也不是常人可以正确做到的。

因此,在 RxJava v1.1.7 版本中,聪明的 RxJava 开发人员提供了一个新的函数:Observable.fromAsync() 来处理异步 Api 的封装。

注意:在 1.2.0 版本中,该函数被重命名为 Observable.fromEmitter()

使用 Observable.fromAsync() 【Observable.fromEmitter()】

该函数的文档还很简单,直接看示例代码更加容易理解如何使用该函数:

注意上面的实现中,并没有处理 第 2、3 步骤,Observable.fromAsync() 自动处理了该问题。 通过 setCancellation() 来实现 第 1 个步骤,而第 4 个步骤只要指定 backpressure 策略就可以了。

处理 Backpressure

Backpressure 是用来描述,生产者生产数据的速度比消费者消费数据的速度快的一种情况。如果没有处理这种情况,则会出现 MissingBackpressureException 。

由于手工实现 Backpressure 是很困难的,如果使用 fromAsync() 函数则我们只需要理解各种 Backpressure 策略即可,不用自己实现。

BackpressureMode 有如下几种策略:

  • BUFFER(缓存):使用无限个数的内部缓存(RxJava 默认使用 16 个元素的内部缓存),一开始会创建一个 128 个元素的缓冲对象,然后动态的扩展直到 JVM 内存不足。

    使用 hot Observable 的时候通常会指定这种策略,比如上面的示例。

  • LATEST(使用最新的):只发射最新生成的数据,之前的旧的数据被丢弃。类似于使用缓冲个数为 1 的缓存。

    cold Observable 通常可以使用这种策略。比如 Andorid 里面的电量变化、或者 最近的位置信息就可以使用这种策略。之前旧的数据已经为无效数据直接丢弃就好。

  • DROP(直接丢弃):只发射第一个数据,之后的所有数据就丢弃。

    通常用来指生成一个数据的 Observable。

  • ERROR / NONE: 默认的不指定任何策略,会出现 MissingBackpressureException

在封装异步 API 的时候,根据异步 API 的特点,来选择合适的策略是非常重要的。

示例代码

AndroidRxFromAsyncSample 项目演示了本文中的内容。

image

结论

当需要封装现有 API 为 Observable 的时候,可以考虑:

  1. 如果为同步 API 则使用 Observable.fromCallable()
  2. 如果为异步 API 则:
    • 避免使用 Observable.create()
    • 使用 Observable.fromAsync() 并正确实现如下步骤
      • 在合适的地方调用 onNext(), onCompleted(), 和 onError()
      • 如果需要清理资源,则使用 setCancellation()
      • 选择正确的 BackPressureMode 策略

本文出自 云在千峰,转载时请注明出处及相应链接。

本文永久链接: http://blog.chengyunfeng.com/?p=1019

Ɣ回顶部