RxJava 教程第四部分:并发 之意外情况处理

作者: rain 分类: 移动 发布时间: 2016-04-26 21:19 6 条评论

Rx 尽量避免状态泄露到数据流之外的场景。但是有些东西本身就带有状态。比如服务器可以上线和离线、手机可以访问Wifi、按钮被按下了等。在 Rx 中国,我们在一段时间内看到这些事件,并称之为窗口(window)。其他事件在这个窗口内发生可能需要特殊处理。例如,手机在使用移动收费上网的时候,会把网络请求优先级降低,来避免天价流量费的情况。

窗口 Window

buffer 函数可以缓存多个数据并整体发射。 window 操作函数和 buffer 有一对一的关系。区别在于 window 不会整体返回缓存的数据。而是把缓存的数据当做一个新的 Observable 数据流来返回。这样当源 Observable 有数据发射了,这个数据就立刻发射到 window 返回的 Observable 里面了。下图可以看到二者的区别:

image

window :
image

如果你还没有了解 buffer, 建议你到前面的章节下去看看 buffer。 buffer 和 window 的函数形式是一样的,功能也非常类似,并且易于理解。 buffer 都可以用 window 来实现其功能:

Window by count

窗口内可以限定数目。当窗口发射的数据达到了限定的数目,当前窗口的 Observable 就结束并开启一个新的窗口。

image

和buffer 一样, 使用 window(int count, int skip) 也可以跳过数据或者重叠使用数据。

结果:

可以看到当有数据重叠的时候, 多个 Observable 会返回同样的数据,可以把结果输出形式修改一下,方便查看:

结果:
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4]
[4]

这样就可以看到 window 和 buffer 是非常类似的了。

Window by time

同样也可以指定窗口的时间长度:

image

结果:

上面的示例中,每隔100ms开始一个新的窗口,每个窗口持续 250ms。 第一个窗口从 0ms 开始并捕获到数据 [0, 1](0 是在第100ms的时候发射的)。

Window with signal

同样也可以用另外一个信号 Observable当做窗口结束的信号。

image

信号 Observable 直接也可以相互传递事件。

image

下面是使用信号 Observable 实现的重叠窗口:

结果:

注意上面的数字 0 没有捕获到,原因在于源 Observable 和 信号 Observable 都是在同一时刻发生的,但是在实际操作中并没有这种情况。所以当信号 Observable发射的时候, 数字 0 已经发射出去了。

Join

join 可以把两个数据流中的数据组合一起。 zip 函数根据数据发射的顺序来组合数据。 join 可以根据时间来组合。

join 组合的两个 Observable 被称之为 左和右。 上面的函数并不是静态的,调用该函数的 Observable就是 左 。参数中的 leftDurationSelector 和 rightDurationSelector 分别使用 左右发射的数据为参数,然后返回一个定义时间间隔的 Observable,和 window 的最后一个重载函数类似。这个时间间隔是用来选择里面发射的数据并组合一起。里面的数据会当做参数调用 resultSelector ,然后返回一个组合后的数据。然后组合后的数据由 join 返回的 Observable 发射出去。

join 比较难以理解以及强大之处就是如果选择组合的数据。当有数据在 源 Observable 中发射,就开始一个该数据的时间窗口。对应的时间间隔用来计时该数据的窗口何时结束。在时间窗口还没结束的时候,另外一个 Observable 发射的数据就和当前的数据组合一起。左右数据流的处理方式是一样的,所以为了简化介绍,我们假定只有一个 源 Observable 有时间窗口。

下面的示例中, 左Observable 数据流从来不结束而右Observable的时间窗口为 0.

结果:

由于左边的 Observable 时间窗口是永久,这意味着左边每个发射的数据都会和右边的 数据组合。 当右边数据发射的比左边的慢一倍。所以当左边的数据发射了两个对应右边的同一个数据。然后右边发射下一个数据就开启了右边的一个新的时间窗口,然后左右的数据会从开始的数据和右边的新窗口中的数据组合。

下面示例把左右源 Observable 发射的间隔都设置为 100ms,然后把左时间窗口设置为 150ms:

结果:

左右同时发射数据,所以左右同时开始第一个时间窗口,然后组合的数据为 “L0 – R0”。然后 左边的时间窗口继续,而右边发射新的数据 R1 则右边的数据R1和左边的 L0 组合 “L0 – R1”,然后过了 50ms 后, 左边的时间窗口结束了,开启下一个时间窗口,结果为 “L1 – R1”。 一直重复下去。

两个数据流都有时间窗口。每个数据流中的每个值按照如下方式组合:
– 如果旧的数据时间窗口还没有结束,则和另外一个数据流中的每个旧的数据组合
– 如果当前数据的时间窗口还没有结束,则和另外一个数据流中的每个新的数据组合。

groupJoin

只要检测到一个组合数据,join 就用两个数据调用 resultSelector 并发射返回的数据。 而 groupJoin 又有不同的功能:

除了 resultSelector 以外,其他参数和 join 函数的参数是一样的。这个 resultSelector 从左边的数据流中获取一个数据并从右边数据流中获取一个 Observable。这个 Observable 会发射和左边数据配对的所有数据。groupJoin 中的配对和 join 一样是对称的,但是结果是不一样的。可以把 resultSelect 实现为一个 GroupedObservable, 左边的数据当做 key,而把右边的数据发射出去。

还使用第一个 jion的示例,左边的数据流的时间窗口重来不关闭:

结果:

上面的 示例和 jion 中的示例数据配对是一样的,只是 resultSelector 不一样导致输出的结果不一样。

使用 groupJoin 和 flatMap 可以实现 jion的 功能:

通过 join 和 groupBy 也可以实现 groupJoin。在示例代码中有这个实现,感兴趣的可以去看看。

本文出自 云在千峰,转载时请注明出处及相应链接。

本文永久链接: http://blog.chengyunfeng.com/?p=980

Ɣ回顶部