前面文章我们介绍了Channel
,它可以用于协程间的通信,我们可以把它看成是一个密闭的管道,管道俩头可以发送和接收数据。
在使用协程处理异步任务时,在简单的场景中,我们可以使用launch
、async
、挂起函数、Channel
等来实现,但是对于复杂的逻辑操作,比如一个数据源需要从IO线程中获取,然后进行过滤和处理,再在UI线程展示,再在IO线程中进行保存,这种就需要使用更强大的组件,即Flow
。
Flow及其强大,及其灵活
,可以这样说,在Flow
出现之前,Kotlin的挂起函数、结构化并发可能不足以形成协程的核心竞争力,但是类似RxJava
的Flow
出现后,Kotlin的协程就被大家认可了。
学会使用Flow
,不仅仅可以让我们在切换线程、处理数据上更加方便,Flow
的链式调用风格的API可以让代码更加容易阅读。理解Flow
的原理,以及挂起API的实现,更有益于让我们写出Flow
风格的代码。
把Channel
比喻为管道,因为它只发送接这2个操作,数据在封闭的管道中进行传递。但是Flow
就不一样了,这个是"流"的概念,即可以把Flow
比喻为一条河流,河流中流淌的是数据,这个数据从河流的发源地开始出发,可以在中间被各种处理厂进行处理,最后流入大海。
所以Flow
就是一个数据流,它也有上下游的概念,比如下图:
数据从发源地开始,可以经过多个中转站进行处理。
既然Flow
是数据流,那流中的数据是如何产生的呢?这就需要上游操作符,我们直接看代码:
fun main() = runBlocking {
//上游,发源地
flow {
//挂起函数,emit是挂起函数
emit(1)
emit(2)
emit(3)
emit(4)
emit(5)
}.filter { it > 2 } // 中转站1
.map { it * 2 } // 中转站2
.take(2) // 中转站3
.collect{ // 下游
println(it)
}
}
上面代码中就创建了一个Flow
,然后往Flow
中发送了5个数据,再通过中间3个中转站对数据进行了处理,最后通过collect
高阶函数进行收集,上面代码的运行结果如下:
这里完全符合预期,而且这种链式调用非常符合阅读习惯。
flow{}
创建Flow
我们这里先重点看一下这里的flow{}
高阶函数,它的作用是创建一个新的Flow
,同时在它的lambda中我们使用emit()
挂起函数往这个Flow
中发送数据,它是一个上游操作符。
所以上游操作符的作用是创建一个Flow
,然后负责往Flow
中发送数据;类比于现实中,河流的水也是从上游产生的一样,所以上游操作符不仅要创建Flow
,还负责发送数据。
我们可以简单看一下flow{}
函数定义:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T>
= SafeFlow(block)
首先它是一个顶层函数,然后block
是函数类型,而且它的接收者是FlowCollector
类型,根据简写约定,block
就相当于是FlowCollector
的成员函数,所以它可以调用FlowCollector
类型中的方法、变量,该接口:
public fun interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
这里我们可以发现我们可以在block
中调用emit
也就知道原因了。
这里还有一个疑惑,flow
方法不是扩展函数,但是block
中可以调用emit
方法,这个emit
方法是哪个对象的呢?
我们简单来看一下其实现SafeFlow
:
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
flow{}
函数会返回一个SafeFlow
对象,然后在其实现方法collectSafely
中会有一个FlowCollector
类型的对象,然后可以利用该对象调用block
,即collector.block()
,这里也进一步说明了前面的约定观点。
那这个collector
对象是什么赋值的呢?具体原理研究,等后面原理篇文章再细说。
flowOf
和asFlow
创建Flow
除了上面方法外,还有类似于集和API的arrayListOf
等的flowOf
函数,以及asFlow
把集和转换为Flow,代码如下:
fun main() = runBlocking {
listOf(1,2,3,4,5).asFlow()
.filter { it > 2 } // 中转站1
.map { it * 2 } // 中转站2
.take(2) // 中转站3
.collect{ // 下游
println(it)
}
flowOf(1,2,3,4,5)
.filter { it > 2 } // 中转站1
.map { it * 2 } // 中转站2
.take(2) // 中转站3
.collect{ // 下游
println(it)
}
}
这2种方法也可以创建Flow
并且往里发送数据。
我们来简单看一下asFlow
和flowOf
:
public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
}
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
for (element in elements) {
emit(element)
}
}
可以发现这2个方法都是调用flow{}
高阶函数函数来实现的,并且都是遍历调用emit
方法来完成发送数据,所以核心还是这个emit
方法,后面文章再说其原理。
所以这里上游操作符的创建Flow
的方式有3种:
Flow 创建方式 | 使用场景 | 用法 |
---|---|---|
flow{} | 未知的数据集,用emit 发送数据 | flow { emit(1)} |
flowOf() | 已知数据,类似于集合的arrayOf | flowOf(1,2,3) |
asFlow() | 已知数据集合 | list.asFlow() |
我们在日常项目中需要根据具体情况来使用。
当数据被发送到Flow
中,就可以使用中间操作符来对流中的数据进行处理,由上面例子代码中我们可以 发现比如filter
、map
这些类似的API其实就是从集和那边抄来的,这些操作符我相信看名字就能够识别和使用。所以这里重点说一些和集和无关的API。
Flow
生命周期回调函数在Flow
的中间操作符中有2个比较特殊的操作符,甚至他们有点不像是中间操作符,就是onStart
和onCompletion
,其实是Flow
开始和完成的回调函数,我们来看个例子:
fun main() = runBlocking {
flow{
println("发射 1")
emit(1)
println("发射 2")
emit(2)
println("发射 3")
emit(3)
println("发射 4")
emit(4)
println("发射 5")
emit(5)
}
.filter {
println("filter $it")
it > 2
}
.map {
println("map $it")
it * 2
}
.take(2)
.onStart {
println("start")
}
.onCompletion {
println("onCompletion")
}
.collect{
println("collect $it")
}
}
这里逻辑就不说了,其中onStart
就是Flow
开始的回调,而onCompletion
就是Flow
结束的回调,通过这个我们可以看出Flow
的运行状态,同时这2个操作符和位置是无关的,虽然onStart
已经放在了发射和几个操作数据的操作符后面,但是在执行时他却是最先执行的,下面是运行结果:
而其他操作数据的操作符是和所在的位置有关的,这个也非常容易理解,比如我们把take(2)
放到.filter
前,打印结果如下:
这里结果就会发生变化,也非常好理解,因为Flow
是数据流,中间的操作会影响后的。所以这也是为什么感觉onStart
和onCompletion
不太像中间操作符就是这个原因。
而这里onCompletion
回调触发的情况有下面3种情况:
Flow
正常执行完毕;Flow
当中出现异常;Flow
被取消。其实这些都挺好理解的,在介绍协程的句柄Job
那一章我们说过类似的逻辑。
这里我们还是简单看看onStart
方法:
public fun <T> Flow<T>.onStart(
action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = unsafeFlow {
val safeCollector = SafeCollector<T>(this, currentCoroutineContext())
try {
safeCollector.action()
} finally {
safeCollector.releaseIntercepted()
}
collect(this) // directly delegate
}
首先,调用onStart
方法会返回一个新的Flow
,其次action
函数类型接收者是FlowCollector
,所以可以在onStart
的lambda
中调用emit
方法,并且会在上游发送数据前先执行,比如如下代码
flowOf("a", "b", "c")
.onStart { emit("Begin") }
.collect { println(it) }
这里会打印Begin,a,b,c
,也会说明onStart
方法的执行优先级非常高,和位置无关。
这里衍生一点,就是SharedFlow
,因为Flow
是冷的,而SharedFlow
是热的,所以当onStart
和SharedFlow
一起使用时,就无法保证onStart
一定会在发送数据之前执行,这时可以使用onSubscription
其他API来完成,关于这点,后面说SharedFlow
再细说。
接着来看看onCompletion
:
public fun <T> Flow<T>.onCompletion(
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T> = unsafeFlow {
try {
collect(this)
} catch (e: Throwable) {
ThrowingCollector(e).invokeSafely(action, e)
throw e
}
// Normal completion
val sc = SafeCollector(this, currentCoroutineContext())
try {
sc.action(null)
} finally {
sc.releaseIntercepted()
}
}
该方法会在flow
正常完成、被取消后执行,并且运行给定的action
。这里我们需要注意一点,就是action
的函数类型是(Throwable?) -> Unit
,即当程序发生异常时,或者调用cancel()
会产生一个CancellationException
时,这个异常信息会被传递到onCompletion
的action
中。
和下面要说的catch
不同,这个操作符会抛出上游和下游的异常,和位置关系是不固定的,如果Flow
在执行过程中没有异常,则cause
会是null
。
还有一个神奇操作,就是action
的接收者类型是FlowCollector
,所以在action
中依旧可以调用emit
发送数据,由上面源码可知,在onCompletion
中emit
的数据,会在最后被收集。
在前面我们知道Flow
就是数据流,它有3个部分组成:上游、中间操作和下游,那么当Flow
中发生异常时,也可以根据这个标准来进行分类,也就是异常发生的位置。
catch
捕获异常当Flow
的异常发生在上游或者中间时,可以使用catch
进行捕获异常,注意catch
这个操作符就是和我们平时使用的try-catch
意义用来捕获异常,只不过这个是用在Flow
中的。
同时catch
操作符的作用和它的位置是强相关的,比如下面代码:
fun main() = runBlocking {
flow{
emit(1)
emit(2)
throw IllegalStateException()
emit(3)
}.map { it * 2 }
.catch { println("catch exception: $it") }
.collect{
println("collect : $it")
}
}
这里在上游操作符中抛出一个异常,这里就会被捕获到,同时会终止Flow
的继续发射和执行,所以打印如下:
注意这里catch只能捕获发生在它上游的异常,当异常发生时不再继续发生数据和执行数据。
try-catch
捕获下游异常这里说了catch
是一个中间操作符,可以捕获在它之前的异常,那对于下游操作符的异常呢 比如在collect
高级函数中发生异常,这里最简单的办法就是直接使用try-catch
即可,比如下面代码:
fun main() = runBlocking {
flow{
emit(1)
emit(2)
throw IllegalStateException()
emit(3)
}.map { it * 2 }
.catch { println("catch exception: $it") }
.collect{
try {
println("collect : $it")
throw IllegalStateException()
}catch (e: Exception){
println("Catch : $e")
}
}
}
这里直接就在下游操作符中使用普通的try-catch
进行捕获异常。
关于异常捕获,我们后面有文章单独介绍,这里我们只需要知道Flow
提供的catch
中间操作符是和位置有关的,只能捕获它上游的异常,想捕获终止操作符中的异常,还是得老老实实使用try-catch
。
Context
前面说了Flow
在复杂业务上可以取代RxJava
,而复杂的业务经常需要频繁地切换工作的线程,对于耗时任务,我们需要在线程池中执行,对于UI任务,我们需要在主线程上执行,而在Flo
w当中,我们可以借助flowOn
这个中间操作符便可以完成需求。
flowOn
切换上游的上下文还是看个示例代码,如下:
fun main() = runBlocking {
flow {
logX("Start")
emit(1)
logX("Emit 1")
emit(2)
logX("Emit 2")
emit(3)
logX("Emit 3")
}.filter {
logX("Filter: $it")
it > 2
}.flowOn(Dispatchers.IO)
.collect{
logX("Collect $it")
}
}
和前面的catch
操作符的作用域一样,它的作用域只对它的上游有作用,上面代码的运行结果如下:
================================
Start
Thread:DefaultDispatcher-worker-1 @coroutine#2
================================
================================
Filter: 1
Thread:DefaultDispatcher-worker-1 @coroutine#2
================================
================================
Emit 1
Thread:DefaultDispatcher-worker-1 @coroutine#2
================================
================================
Filter: 2
Thread:DefaultDispatcher-worker-1 @coroutine#2
================================
================================
Emit 2
Thread:DefaultDispatcher-worker-1 @coroutine#2
================================
================================
Filter: 3
Thread:DefaultDispatcher-worker-1 @coroutine#2
================================
================================
Emit 3
Thread:DefaultDispatcher-worker-1 @coroutine#2
================================
================================
Collect 3
Thread:main @coroutine#1
================================
Process finished with exit code 0
可以发现在Flow
中,emit
和filter
所在的协程2运行在子线程中,而collect
运行在主线程中,这也就印证了前面所说的作用域问题。
launchIn
指定CoroutineScope
前面和catch
操作符一样的问题,它的作用域范围只是它前面的部分,同时假如多次调用flowOn
则当前flowOn
的范围是到上一个flowOn
,这个也非常好理解。
那下面就来解决如何指定下游操作符的运行线程,或者更直接的就是flowOn
后面的中间操作符和终止操作符所运行的线程,Kotlin这里提供了一个叫做launchIn
的操作符,它可以把一部分操作指定Scope
,示例代码如下:
//新建一个Dispatcher
val mySingleDispatcher = Executors.newSingleThreadExecutor {
Thread(it, "MySingleThread").apply {
isDaemon = true
}
}.asCoroutineDispatcher()
fun main() = runBlocking {
//创建一个CoroutineScope
val scope = CoroutineScope(mySingleDispatcher)
flow {
logX("Start")
emit(1)
logX("Emit 1")
emit(2)
logX("Emit 2")
emit(3)
logX("Emit 3")
}.flowOn(Dispatchers.IO)
.filter {
logX("Filter: $it")
it > 2
}.onEach {
logX("onEach $it")
}
//指定运行在哪个协程范围内
.launchIn(scope)
delay(1000)
}
这里的代码比较特殊,我们可以肯定地是发射数据地代码肯定是在IO线程中,而filter
和onEach
中的代码运行在什么地方呢?
通过运行我们会发现这个俩部分代码会运行在我们自定义的线程池中:
================================
Start
Thread:DefaultDispatcher-worker-1 @coroutine#3
================================
================================
Emit 1
Thread:DefaultDispatcher-worker-1 @coroutine#3
================================
================================
Emit 2
Thread:DefaultDispatcher-worker-1 @coroutine#3
================================
================================
Emit 3
Thread:DefaultDispatcher-worker-1 @coroutine#3
================================
================================
Filter: 1
Thread:MySingleThread @coroutine#2
================================
================================
Filter: 2
Thread:MySingleThread @coroutine#2
================================
================================
Filter: 3
Thread:MySingleThread @coroutine#2
================================
================================
onEach 3
Thread:MySingleThread @coroutine#2
================================
Process finished with exit code 0
这里的疑点我们来慢慢解答:
onEach
操作符是什么东西,就是返回一个新的Flow
,但是把每个上游的值都在其高级函数中执行一遍,比如这里就是println一下,然后再emit
到新的Flow
中,源码如下:public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
action(value)
return@transform emit(value)
}
可以看出,这里出现了一个新的Flow
。
launchIn
操作又是啥呢 为什么没有终止操作符,这里的代码依旧可以运行,我们来看一下源码:public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
collect() // tail-call
}
会发现launchIn
是Flow
的扩展函数,而且直接在scope
调用launch
启动了一个新协程,而协程中调用了collect()
,这也就说明了为什么这里没有终止操作符的原因。
所以严格意义上说launchIn
算是一个终止操作符,把它上游的代码都分发到指定的线程当中。
同时这个launchIn操作符在源码中有特殊的使用说明,代码注释如下:
This operator is usually used with onEach,
onCompletion and catch operators to process all emitted values handle an exception that might occur in the upstream
flow or during processing,
for example:
flow
.onEach { value -> updateUi(value) }
.onCompletion { cause -> updateUi(if (cause == null) "Done" else "Failed") }
.catch { cause -> LOG.error("Exception: $cause") }
.launchIn(uiScope)
即这个操作符经常和onEach
、onCompletion
、catch
一起使用,处理所有发出的值,处理可能在上游或者处理过程中发生的异常。
最后,我们到了Flow
的下游部分,这里可以使用终止操作符来终止整个Flow
,当使用了终止操作符,我们再也无法使用map
这种中间操作符了。
最常用的就是collect
终止操作符,代表数据流的终止。同时还有一些从集合中抄过来的操作符,比如first()
、single()
、fold
、reduce
等等。
另外把Flow
转换为集合的时候,本身也意味着Flow
数据流的终止,比如toList
,也算是终止操作符。
上一篇文章我们说了Channel
即管道这个是热的,特点就像是热情的服务员,不管你想不想喝水,都会给你端茶递水,那这里的Flow
其实也非常好理解,我们看个代码:
fun main() = runBlocking {
// 冷数据流
val flow = flow {
(1..3).forEach {
println("Before send $it")
emit(it)
println("Send $it")
}
}
// 热数据流
val channel = produce<Int>(capacity = 0) {
(1..3).forEach {
println("Before send $it")
send(it)
println("Send $it")
}
}
println("end")
}
上面代码的打印:
可以发现:Channel
之所以认为是热的,是因为不管有没有接收方,发送方都会工作;而Flow
冷的原因是,只有调用终止操作符以后,Flow
才会工作。
默认情况下,Flow
不仅是冷的,还是懒的,比如下面代码:
fun main() = runBlocking {
flow {
println("emit: 3")
emit(3)
println("emit: 4")
emit(4)
println("emit: 5")
emit(5)
}.filter {
println("filter: $it")
it > 2
}.map {
println("map: $it")
it * 2
}.collect {
println("collect: $it")
}
}
这里的执行结果如下:
会发现它一次只处理一个数据,结合上一篇文章的服务员端茶倒水的例子,就是Flow
不仅是一个冷淡的服务员,还是一个懒惰的服务员,明明饭桌上有3个人需要喝水,但服务员偏偏不一次递上3杯水,而是每个人都叫服务员一次,服务员才一杯一杯地把水递过来。
这篇文章内容很多,也是Kotlin的一个重点,Flow
在Android中有很多地方使用,等后面说一下Flow
的原理,再来讨论在Android中如何使用Flow
。
总体来说,Flow
和Channel
一样,都是用来协程间的通信的,可以让我们更方便的处理复杂逻辑。
整个Flow
的API设计可以分为上游、中间操作符和下游操作符:
Flow
,同时产生数据,主要有3个API:flow{}
、flowOf()
和asFlow()
;onStart
和onCompletion
;第三类是捕获异常catch操作符
;第四类是切换context
,比如flowOn
和launchIn
。collect
;第二类就是集合抄过来的;第三类就是Flow
转集合的API,比如flow.toList()
。最后就是和Channel
对比,说Flow
为什么是冷的,以及优势和劣势。
阅读量:939
点赞量:0
收藏量:0