Kotlin Flow? 真香!-灵析社区

德州安卓

前言

由于项目是使用的是MVVM架构,使用DataStore等组件时会发现返回的是Flow

喂!我LiveData都还没有用熟呢,咋就开始用Flow了,既然官方这样推广,当然是道理所在,那本章就说一说Flow。

为了看到第一手准确资料,我是硬着头皮看完了Google官方英语介绍的视频,虽然很多都没有听懂,不过Google的大佬做的图还是很容易理解,本章很多图都是截取视频。

背景

在日常开发中,尤其是APP端经常有同时发起多个请求然后进行展示内容的需求,比如我先拿到标题(因子)先展示出来,再根据因子去加载数据,拿到数据后再进行分类、处理后再进行展示,这种复杂的业务,如果使用Java原来的回调方式,那就会一层套一层,还要不断地切换线程

那当然不用,前面文章也说了使用Kotlin协程可以方便地切换线程以及使用阻塞式地方式写出非阻塞式的代码,那这个问题解决了。

还有一个问题就是数据处理问题,这时你当然会说LiveData也有数据处理的API,比如map等等,说的很有道理,确实也是这个道理,那为什么Google官方又要整这么一出呢?等我后面细细说来。

LiveData有什么不足?

什么是LiveData,它就是一个基于观察者模式的可观察的数据存储器类,同时和普通的观察者又不太一样,它结合了Android生命周期,能感知Android界面的生命周期变化,这种能力可以确保LiveData仅更新处于活跃生命周期状态的应用组件观察者。

所以,LiveData是一个简单易用的组件,同时也就是因为它的简单易用,和之前同是观察者模式的老大哥RxJava比功能就欠缺了不少,所以处理比较复杂的场景会比较麻烦。

由于LiveData设计比较简单,所以有以下不足:

    • LiveData只能在主线程更新数据。
    • LiveData操作符不够多,处理复杂数据时不够用。

虽然我也是刚用Jetpack,我咋记得可以子线程中更新数据呢,不少有个postValue方法吗?

是的,但是postValue方法也需要切换到主线程,再进行更新数据,所以在处理复杂业务时,我从子线程获取的数据,我就想在子线程中先展示一下,然后接着再处理再请求,不想老是切换线程,这时就可以使用Flow了。

注:由于本人没有真正了解、使用过RxJava,所以关于Flow和RxJava的比较就不细说了,主要也就是RxJava入门门槛高,在Android中需要自己处理生命周期,Flow也支持线程切换、有协程Scope兜底不会出现性能浪费等。

Flow简介

因为关于Flow的文章挺少,我就直接看了官方的视频介绍,先不谈什么API和原理,先按照官方文档来普及一波先。

问题

现在有3个耗时任务,分别是A、B、C,在协程中进行,那么使用不同的返回值来接收3个任务的结果会是什么区别呢?

使用List

如图所示,调用foo时会切换线程,

很显然这里的缺点就是必须要等3个任务都执行完,然后把结果放入到一个List中,返回给调用者,如果某个任务执行很久,则需要等待。

示例代码:

runBlocking{
    val list = foo()
    for (x in list) Log.i(TAG, "initData: $x")
}
suspend fun foo(): List<String> = buildList {
    Log.i(TAG, "foo: 开始发送数据")
    delay(1000)
    add("A")
    delay(1000)
    add("B")
    delay(1000)
    add("C")
    Log.i(TAG, "foo: 结束发送数据")
}

再看一下打印结果:

`2021-08-25 14:34:45.307 15141-15141/: foo: 开始发送数据

2021-08-25 14:34:48.309 15141-15141/: foo: 结束发送数据

2021-08-25 14:34:48.311 15141-15141/: initData: A

2021-08-25 14:34:48.312 15141-15141/: initData: B

2021-08-25 14:34:48.312 15141-15141/: initData: C`

这里耗时任务执行了3S多,然后数据全部返回,调用者再进行处理。

使用Channel

接着看一下Channel,什么是Channel的具体定义以及使用我之前也没有使用过,不过从代码注释可以知道它实际就是一个队列,而且是并发安全的,可以用来连接协程,实现不同协程的通信,还是看官方的截图:

从这个图更可以看出Channel就是一个队列,不过这个队列相当于管道一样,一边生产数据,一边消费数据,最主要是它还可以在跨协程工作。

那同样写个小例子:

//先定义一个变量,也就是这个Channel
val channel = Channel<String>()
//开启协程,不断地receive也就是消费管道里地数据
lifecycleScope.launch(Dispatchers.IO) {
    while (true){
        Log.i(TAG, "initData: receive:  ${channel.receive()}")
    }
}
//开启协程发送数据
lifecycleScope.launch {
    foo()
}
//模拟耗时操作
suspend fun foo(){
    Log.i(TAG, "foo: 开始发送数据")
    delay(1000)
    channel.send("A")
    delay(1000)
    channel.send("B")
    delay(1000)
    channel.send("C")
    Log.i(TAG, "foo: 结束发送数据")
}

结果可想而知是什么样子的:

`2021-08-25 14:58:42.236 16024-16024/: foo: 开始发送数据

2021-08-25 14:58:43.240 16024-16062/: initData: receive: A

2021-08-25 14:58:44.247 16024-16061/: initData: receive: B

2021-08-25 14:58:45.252 16024-16024/: foo: 结束发送数据

2021-08-25 14:58:45.254 16024-16062/: initData: receive: C`

可以看出在第一个耗时任务结束发送时,消费者已经开始工作了,不用等待所有任务都结束。

使用Flow

关于Flow可以叫做流,这个和管道Channel设计的很像,也是生产者、消费者模型,一边生产数据,一边消耗数据。

不过官方视频里说Flow是冷流,在有订阅对象时才开始产生数据也就是emit事件,所以在没有collect之前,Flow内部没有任何协程被激活,不会造成资源泄漏。直接看一下官方截图:

示例代码:

//返回一个Flow实例
suspend fun foo(): Flow<String> = flow {
    Log.i(TAG, "foo: 开始发送数据")
    delay(1000)
    emit("A")
    delay(1000)
    emit("B")
    delay(1000)
    emit("C")
    Log.i(TAG, "foo: 结束发送数据")
}
//进行collect
lifecycleScope.launch(Dispatchers.IO) {
    val flow = foo()
    flow.collect {
        Log.i(TAG, "initData: $it")
    }
}
//发送数据
lifecycleScope.launch {
    foo()
}

可以预见,这个结果肯定和使用Channel是一样的,打印:

2021-08-25 15:15:08.507 16419-16458/: foo: 开始发送数据

2021-08-25 15:15:09.515 16419-16457/: initData: A

2021-08-25 15:15:10.522 16419-16458/: initData: B

2021-08-25 15:15:11.529 16419-16457/: initData: C

2021-08-25 15:15:11.529 16419-16457/: foo: 结束发送数据

到这里会发现和使用Channel一模一样的,区别就是Flow这里先拿到了Flow引用,只有去collect的时候,上游才去emit数据。

为了加深印象和区别,总结一下:

Flow介绍

1.Flow是序列形式的,这个是区分List的,下面这个图可以很容易的看出区别。

2.Flow是冷流,什么是冷流?

Flow默认是冷流,如果要使用热流,可以使用SharedFlow,关于2者的区别,可以看下面图:

(1)冷流

冷流可以保证不必要的内存浪费,因为只有去collect时才会触发发射端的协程代码运行,如果有2个Collector,那另一个Collector也只有在collect时才会触发发射端协程运行,且会都跑一遍,如图:

看一下代码:

//foo是冷流Flow
var foo: Flow<String>? = null
//在init函数里 开启线程,去emit数据
lifecycleScope.launch {
    foo()
}
//开启2个协程 都拿到foo,然后进行collect
lifecycleScope.launch(Dispatchers.IO) {
    foo?.collect {
        Log.i(TAG, "initData: A开始收集 $it")
    }
}
//这里的协程,先延迟了2s 再去进行collect
lifecycleScope.launch {
    delay(2000)
    foo?.collect {
        Log.i(TAG, "initData: B开始收集 $it")
    }
}

foo()函数如下:

//这里会每隔1s发送一个数据
suspend fun foo(){
   foo =  flow {
        Log.i(TAG, "foo: 开始发送数据")
        delay(1000)
        Log.i(TAG, "foo: 开始发送A")
        emit("A")
        Log.i(TAG, "foo: 结束发送A")
        delay(1000)
        Log.i(TAG, "foo: 开始发送B")
        emit("B")
        Log.i(TAG, "foo: 结束发送B")
        delay(1000)
        Log.i(TAG, "foo: 开始发送C")
        emit("C")
        Log.i(TAG, "foo: 结束发送C")
        Log.i(TAG, "foo: 结束发送数据")
    }
}

然后看一下打印:

2021-08-30 11:49:13.862 29955-29992/com.wayeal.yunapp I/zyh: foo: 开始发送数据
2021-08-30 11:49:14.868 29955-29992/com.wayeal.yunapp I/zyh: foo: 开始发送A
2021-08-30 11:49:14.870 29955-29992/com.wayeal.yunapp I/zyh: initData: A开始收集 A
2021-08-30 11:49:14.870 29955-29992/com.wayeal.yunapp I/zyh: foo: 结束发送A
2021-08-30 11:49:15.868 29955-29955/com.wayeal.yunapp I/zyh: foo: 开始发送数据
2021-08-30 11:49:15.874 29955-29992/com.wayeal.yunapp I/zyh: foo: 开始发送B
2021-08-30 11:49:15.875 29955-29992/com.wayeal.yunapp I/zyh: initData: A开始收集 B
2021-08-30 11:49:15.875 29955-29992/com.wayeal.yunapp I/zyh: foo: 结束发送B
2021-08-30 11:49:16.870 29955-29955/com.wayeal.yunapp I/zyh: foo: 开始发送A
2021-08-30 11:49:16.871 29955-29955/com.wayeal.yunapp I/zyh: initData: B开始收集 A
2021-08-30 11:49:16.871 29955-29955/com.wayeal.yunapp I/zyh: foo: 结束发送A
2021-08-30 11:49:16.877 29955-29992/com.wayeal.yunapp I/zyh: foo: 开始发送C
2021-08-30 11:49:16.877 29955-29992/com.wayeal.yunapp I/zyh: initData: A开始收集 C
2021-08-30 11:49:16.877 29955-29992/com.wayeal.yunapp I/zyh: foo: 结束发送C
2021-08-30 11:49:16.877 29955-29992/com.wayeal.yunapp I/zyh: foo: 结束发送数据
2021-08-30 11:49:17.873 29955-29955/com.wayeal.yunapp I/zyh: foo: 开始发送B
2021-08-30 11:49:17.873 29955-29955/com.wayeal.yunapp I/zyh: initData: B开始收集 B
2021-08-30 11:49:17.873 29955-29955/com.wayeal.yunapp I/zyh: foo: 结束发送B
2021-08-30 11:49:18.876 29955-29955/com.wayeal.yunapp I/zyh: foo: 开始发送C
2021-08-30 11:49:18.877 29955-29955/com.wayeal.yunapp I/zyh: initData: B开始收集 C
2021-08-30 11:49:18.877 29955-29955/com.wayeal.yunapp I/zyh: foo: 结束发送C
2021-08-30 11:49:18.877 29955-29955/com.wayeal.yunapp I/zyh: foo: 结束发送数据

从上面打印我们不难看出:

    • A收集者开始调用collect时,发射协程开始工作,在13.862秒开始
    • 等待2s后,B收集者开始调用collect时,发射协程又开始工作一遍,在15.868,2者相隔2s多
    • 虽然A收集者已经开始收集了,B收集者开始时,依然又跑一遍

(2)热流

热流是一对多的关系,当有多个collector时,这时发射端发射一个数据,每个collector都能接收到,这个很像那个LiveData观察者模型,数据能得到共享,所以也是被称为SharedFlow。同时和冷流只有在collect时才去跑发射端的协程代码不同,热流会在对象创建出时便开始执行。

同样也是看一下示例代码:

val _events = MutableSharedFlow<String>()
//先开启协程,创建出SharedFlow
lifecycleScope.launch {
    foo1()
}
//立马进行收集
lifecycleScope.launch(Dispatchers.IO) {
    _events.collect {
        Log.i(TAG, "initData: A开始收集 $it")
    }
}
//延迟2秒再进行收集
lifecycleScope.launch {
    delay(2000)
    _events.collect {
        Log.i(TAG, "initData: B开始收集 $it")
    }
}
//一开始就发射A,后面每延迟1s发射一次
suspend fun foo1(){
    Log.i(TAG, "foo: 开始发送数据")
    Log.i(TAG, "foo: 开始发送A")
    _events.emit("A")
    Log.i(TAG, "foo: 结束发送A")
    delay(1000)
    Log.i(TAG, "foo: 开始发送B")
    _events.emit("B")
    Log.i(TAG, "foo: 结束发送B")
    delay(1000)
    Log.i(TAG, "foo: 开始发送C")
    _events.emit("C")
    Log.i(TAG, "foo: 结束发送C")
    Log.i(TAG, "foo: 结束发送数据")
}

打印数据:

2021-08-30 14:04:53.404 8383-8383/com.wayeal.yunapp I/zyh: foo: 开始发送数据
2021-08-30 14:04:53.404 8383-8383/com.wayeal.yunapp I/zyh: foo: 开始发送A
2021-08-30 14:04:53.405 8383-8383/com.wayeal.yunapp I/zyh: foo: 结束发送A
2021-08-30 14:04:54.406 8383-8383/com.wayeal.yunapp I/zyh: foo: 开始发送B
2021-08-30 14:04:54.407 8383-8424/com.wayeal.yunapp I/zyh: initData: A开始收集 B
2021-08-30 14:04:54.407 8383-8383/com.wayeal.yunapp I/zyh: foo: 结束发送B
2021-08-30 14:04:55.415 8383-8383/com.wayeal.yunapp I/zyh: foo: 开始发送C
2021-08-30 14:04:55.421 8383-8426/com.wayeal.yunapp I/zyh: initData: A开始收集 C
2021-08-30 14:04:55.423 8383-8383/com.wayeal.yunapp I/zyh: initData: B开始收集 C
2021-08-30 14:04:55.425 8383-8383/com.wayeal.yunapp I/zyh: foo: 结束发送C
2021-08-30 14:04:55.426 8383-8383/com.wayeal.yunapp I/zyh: foo: 结束发送数据

从上面打印不难看出:

    • 热流在创建时,便开始发射数据
    • 由于A收集器在发射器发射完一个数据才开始collect,所以A收集器也收集不到A
    • 2s后B收集器开始收集,这时它只能被迫接受C,因为A和B数据都错过了,也不会重新再跑一遍发射代码

Flow的详细解析

从前面的代码,我们肯定了解了什么是Flow,以及冷流和热流的概念,那现在我们根据来官方的源码,来介绍一下Flow是如何使用和常见API。

上面图中可以大概说出Flow的特性和使用了,另外补充几点:

1、改善的Flow操作符

前面说了Flow就是一个数据流,在中间可以对它进行各种操作,所以Flow的API为了更好的使用链式调用,把一些API进行了改善,比如在每个数据发送时进行延迟、开始进行延迟等等,这里就使用官方的一张图来表示:

2、背压

在生产者、消费者模型中都避免不了背压,在RxJava中使用了很多策略来应对,不过Flow在设计之处就解决了这个问题,其一是Flow发射和收集数据都是异步的,其二就是在发射或者收集添加延迟,来达到缓解背压的情况:

说道背压,这个也是和Channel有很大的区别,Channel的策略是缓冲区,但是Flow自带承压机制,因为是Cold事件源,如果没有消费者,事件源不会主动emit事件。

collect方法以及flow的构造方法均为suspend,所以均可以延迟,这样如果某端没有准备好,也可以通过延迟来承压,在上图也可以看出,简而言之就是没做事情处理背压。

3、buffer操作符

前面的总结图里说了,Flow是顺序队列,也就是开始collect,发射A,处理A,然后发射B,处理B,这里如果收集端有耗时操作,整个耗时就非常长了,如图

代码例子:

//这里消费者,先collect,但是我消费也需要时间,所以延迟1s
lifecycleScope.launch(Dispatchers.IO) {
    val flow = foo()
    flow.collect {
        Log.i(TAG, "initData: 消费 $it")
        delay(1000)
    }
}
lifecycleScope.launch {
    foo()
}
//为了更好的看出执行顺序,每个发送都加了打印
suspend fun foo(): Flow<String> = flow {
    Log.i(TAG, "foo: 开始发送数据")
    delay(1000)
    Log.i(TAG, "foo: 开始发送A")
    emit("A")
    Log.i(TAG, "foo: 结束发送A")
    delay(1000)
    Log.i(TAG, "foo: 开始发送B")
    emit("B")
    Log.i(TAG, "foo: 结束发送B")
    delay(1000)
    Log.i(TAG, "foo: 开始发送C")
    emit("C")
    Log.i(TAG, "foo: 结束发送C")
    Log.i(TAG, "foo: 结束发送数据")
}

可以看出最后的执行打印:

2021-08-25 15:37:44.640 16997-17035/: foo: 开始发送A
2021-08-25 15:37:44.640 16997-17035/: initData: 消费 A
2021-08-25 15:37:45.641 16997-17036/: foo: 结束发送A
2021-08-25 15:37:46.643 16997-17035/: foo: 开始发送B
2021-08-25 15:37:46.643 16997-17035/: initData: 消费 B
2021-08-25 15:37:47.643 16997-17036/: foo: 结束发送B
2021-08-25 15:37:48.645 16997-17035/: foo: 开始发送C
2021-08-25 15:37:48.645 16997-17035/: initData: 消费 C
2021-08-25 15:37:49.648 16997-17036/: foo: 结束发送C
2021-08-25 15:37:49.649 16997-17036/: foo: 结束发送数据

可以看出这里的操作一共花了6s,如何进行改善呢,就是使用buffer:

SharedFlow的详细解析

说完了Flow是冷流,冷流固然很好,但是用的多的还是SharedFlow,也就是热流,也可以叫成共享流。比如在Android的MVVM架构中,需要使用数据驱动来完成,这时就可以替换LiveData为SharedFlow(当然是后面要说的StateFlow),对于一个flow会有多个订阅者,这时就可以进行观察,从而达到多个订阅者都可以根据数据变化而做出变化。

还是简单看一下总结图:

下面我们来具体分析一波。

(1)、构造函数

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>
  • replay:表示当有新的订阅者collect时,发送几个已经发送过的数据给它,默认是0,即默认新订阅者不会获取到订阅之前的数据。 -extraBufferCapacity:表示减去replay,这个Flow还可以缓存多少个数据,默认是0; -onBufferOverflow:表示缓存策略,即缓冲区满了后,如何处理,默认是挂起。

(2)、ShareIn函数

前面说了,如果使用SharedFlow最好使用MutableSharedFlow然后使用emit来发射数据,但是经常有需求是返回一个Flow,这时就需要使用ShareIn函数把Flow转成SharedFlow,ShareIn函数是Flow的扩展函数,看一下参数:

public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T>
  • scope,这个很好理解,协程范围,表示共享开始时所在的协程作用域范围,之前说过SharedFlow很像LiveData,这里的收集者就是观察者,所以为了控制范围,需要传递一个观察者执行操作的协程范围。
  • replay,这个就是当有一个新的观察者订阅时,需要重新传递给这个新的观察者的数量。z
  • started,这个控制共享的开始和结束的策略。这里有3种策略,分别说一下:

(3)、ShareIn函数上游flow操作符

在前面我们说了一些操作符,这里就可以给应用上,比如我在上游结束、异常等情况

onCompletion

val flow = loginRepository.getLoginName()
flow.onCompletion {
        cause -> if (cause == null) Log.i(TAG, "completed: ")
}.shareIn(viewModelScope, SharingStarted.Eagerly)
    .collect {
        userName.value = it
    }

retry

可以对一些指定的异常进行处理,比如IO异常时进行重连等。

val flow = loginRepository.getLoginName()
flow.onCompletion {
        cause -> if (cause == null) Log.i(TAG, "completed: ")
}.retry(5000) {
    val shallRetry = it is IOException
            if (shallRetry) delay(1000)
            shallRetry
}.shareIn(viewModelScope, SharingStarted.Eagerly)
    .collect {
        userName.value = it
    }

onStart 可以在上游数据开始之前,做一些操作。

val flow = loginRepository.getLoginName()
flow.onStart { 
    emit("start")
}
    .onCompletion {
        cause -> if (cause == null) Log.i(TAG, "completed: ")
}.retry(5000) {
    val shallRetry = it is IOException
            if (shallRetry) delay(1000)
            shallRetry
}.shareIn(viewModelScope, SharingStarted.Eagerly)
    .collect {
        userName.value = it
    }

StateFlow的详细解析

终于到了喜闻乐见的StateFlow了,官方就是希望用这个来替代LiveData,那么它到底是如何来替代LiveData的呢?

(1)、特性

  • StateFlow是SharedFlow的子类,根据前面说的那么它是个热流,可以被多个观察者观察,同时可以设置它消失的scope以及条件。
  • StateFlow只更新最新的数据,也就是它是一个replay为0的SharedFlow。
  • StateFlow里面和LiveData很像,都有个value来保存其值,也可以通过这个属性来获取或者设置它的值。

(2)、使用

  • 使用MutableStateFlow就和LiveData一样,不过它需要一个默认值。
  • 也可以使用stateIn函数把一个Flow转成StateFlow,直接看这个函数:
val result = userId.mapLatest { newUserId ->
    repository.observeItem(newUserId)
}.stateIn(
    scope = viewModelScope,
    started = WhileSubscribed(5000),
    initialValue = Result.Loading
)

从这个最基本的使用,我们可以看出以下信息:

  1. 它的范围是viewModelScope,当viewModelScope结束时,流会停止。
  2. 当观察者都被移除后,超过5s,为了不浪费性能,流会停止。
  3. 会有个默认值,也就是这个initivalValue。

(3)、观察

从上面来看,确实很像LiveData,不过LiveData有个很重要的功能就是具有生命周期感知性能力,在UI处于活跃状态时才会更新数据,那这个StateFlow在收集时并没有传递lifecycleOwner,那如何达到一样的效果呢?

首先是观察的操作执行在协程中,这个协程的范围是lifecycleScope不错,但是是直接launch还是launchWhenStarted呢,看下面这个图:

从这个图我们发现如果我们直接使用launch的话,可能会在onStop执行时这时来个数据更新,我的View根本没法更新,所以会造成错误,这里要达到和LiveData一样的效果在界面活跃时进行更新,所以这里启动协程就需要使用launchWhenStarted。

这个看起来没啥问题,不过我们前面不是有个WhileSubscribed这个优化项呢,表示没有观察者时5s后停止上游数据发射,那现在这个则无法做到,这时就需要使用repeatOnLifecycle,直接看图:

这个repeatOnLifecycle的作用就是当满足特定状态时启动协程,并且在生命周期退出这个状态时停止协程。那这就可以比如我APP退到后台,这时如果超过5S,则不会再订阅,协程终止,这时上游发射逻辑也会停止,提高性能。当APP再切换到前台时,会执行onStart,这时又会有观察者,这时上游逻辑又会开始,所以最佳的写法如下:

onCreateView(...) {
    viewLifecycleOwner.lifecycleScope.launch {
        viewLifecycleOwner.lifecycle.repeatOnLifecycle(STARTED) {
            myViewModel.myUiState.collect { ... }
        }
    }
}

总结

看完了关于Flow的介绍,我们会发现它不仅有类似RxJava的强大操作符,也可以和JetPack一起使用,在处理复杂数据上会比LiveData方便很多。

阅读量:2017

点赞量:0

收藏量:0