引言
在 Kotlin 协程中,我们经常需要在多个协程之间进行通信。Kotlin 最初提供了 Channel
作为协程间通信的原语。但是 Channel
有其局限性,不适合一对多或者多对多的通信场景。
因此 Kotlin 后来引入了 BroadcastChannel
接口作为广播通道,它允许多个订阅者独立接收事件或状态更新。但是经过一段时间的使用, BroadcastChannel
也暴露出了设计缺陷。
从 Kotlin 协程 1.4 版本开始, 我们引入了一个更好的解决方案——共享流 (Shared Flow)。本文将详细介绍共享流的思想以及如何使用它。
流的简单性
在库的早期版本中,我们只有channels
,我们试图通过将 channel
作为参数和返回值来实现各种异步序列的转换。这意味着例如 filter
操作符会在自己的协程中运行。
但是这样的性能远远不如直接使用if语句。回想一下,这并不奇怪,因为 channel
是一个同步原语。任何 channel
, 即使是优化过的单生产者单消费者实现,也必须支持并发的通信协程,它们之间的数据传输需要同步,这在现代多核系统中代价昂贵。
当你开始在异步数据流的基础上构建应用程序体系结构时,对转换的需求自然而然地出现了,并且 channel
的成本开始积累。
Kotlin Flow 的简单设计允许高效实现转换操作符。在基本情况下,值是在同一个协程中发出、转换和收集的,不需要任何同步。
流是冷的
然而,流通常是冷的——通过 flow { ... }
构建器函数创建的 Flow<Value>
是一个被动实体。考虑下面的代码:
流本身没有备份的任何计算,也没有任何状态,直到它开始被收集。每个收集协程都执行自己的发送代码实例。“冷流,热通道”的故事描述了 Kotlin 流背后的原因,并展示了其比频道更适合的用例——按需计算返回的异步值流。
共享流 (SharedFlow)
那么,该如何处理像用户操作、外部设备事件、状态更新等内容呢?它们与是否有任何对它们感兴趣的代码无关而独立操作。它们应支持应用程序内的多个观察者。这些是所谓的热事件源。
这就是共享流的概念登场的地方。共享流存在与否与其是否被收集无关。共享流的收集器称为订阅者。共享流的所有订阅者接收相同的一系列值。它有效地像一个“广播频道”,没有大部分频道开销。它使广播频道的概念过时了。
本质上,共享流是一个轻量级的广播事件总线,您可以在应用程序体系结构中创建和使用它。
它具有可调的参数,例如要为新订阅者保留和重播的旧事件数量以及额外的缓冲区容量,以便快速发射器和慢订阅者之间提供缓冲。
共享流的所有订阅者都在自己的上下文中异步收集事件。 发射器不等待订阅者完成处理事件。 但是,当共享流缓冲区已满时,发射器将暂停,直到缓冲区中有空间。 暂停发射器来处理缓冲区溢出为收集者提供了背压,以降低发射速率。 通过 BufferOverlow
参数支持各种替代策略来处理缓冲区溢出。
状态流 (StateFlow)
处理缓冲区溢出的一种流行方法是放弃最老的事件,只保留最新和最新的事件。 特别是,这是为状态变量建模应用程序的绝佳方法。 这是一个非常广泛的用例,它有自己的专门的 StateFlow
类型,可用作过时的 ConflatedBroadcastChannel
的替代品。
可以将 val x:StateFlow<T>
视为 var x:T
的异步和可观察的对应物。 最新的值总是可用的,事实上,只有最新的值才重要,因此更新它总是可能的,无需挂起。
通过状态流,复杂的频道和简单的流之间的性能差异变得非常明显。 状态流的实现具有无分配更新,这在合并广播通道中并非如此。
渠道的用例 (Channel)
随着各种共享流替换了不同种类的广播通道,一个常见的问题是平原的常规通道会发生什么。它们存在的原因有很多。一个原因是通道是实现许多复杂流操作符的低级原语。
但频道也有自己的应用情况。 频道用于处理必须精确处理一次的事件。 当此类事件通常只有一个订阅者,但间歇性地(在启动或某种重新配置期间)没有订阅者时会发生这种情况,并且需要保留发布的所有事件,直到出现订阅者。
BroadcastEventBus
和 SingleShotEventBus
都公开了 Flow<Event>
, 但它们有一个重要的区别。
使用共享流时,事件广播到未知数量(零或多个)的订阅者。 如果没有订阅者,则会立即删除任何已发布的事件。 这是必须立即处理或根本不处理的事件的设计模式。
使用通道时,每个事件都传递给单个订阅者。 如果没有订阅者,发布事件时,一旦通道缓冲区已满,则会挂起等待订阅者出现。 已发布的事件不会被删除。
总结
了解区别并适当使用共享流和通道。 它们都很有用, 并且设计良好,可以很好地协同工作。 但是, 广播通道是过时的产物, 它们将被弃用并在未来删除。