协程(10) | Flow-灵析社区

德州安卓

前言

前面文章我们介绍了Channel,它可以用于协程间的通信,我们可以把它看成是一个密闭的管道,管道俩头可以发送和接收数据。

在使用协程处理异步任务时,在简单的场景中,我们可以使用launchasync、挂起函数、Channel等来实现,但是对于复杂的逻辑操作,比如一个数据源需要从IO线程中获取,然后进行过滤和处理,再在UI线程展示,再在IO线程中进行保存,这种就需要使用更强大的组件,即Flow

Flow及其强大,及其灵活,可以这样说,在Flow出现之前,Kotlin的挂起函数、结构化并发可能不足以形成协程的核心竞争力,但是类似RxJavaFlow出现后,Kotlin的协程就被大家认可了。

学会使用Flow,不仅仅可以让我们在切换线程、处理数据上更加方便,Flow的链式调用风格的API可以让代码更加容易阅读。理解Flow的原理,以及挂起API的实现,更有益于让我们写出Flow风格的代码。

正文

Channel比喻为管道,因为它只发送接这2个操作,数据在封闭的管道中进行传递。但是Flow就不一样了,这个是"流"的概念,即可以把Flow比喻为一条河流,河流中流淌的是数据,这个数据从河流的发源地开始出发,可以在中间被各种处理厂进行处理,最后流入大海。

所以Flow就是一个数据流,它也有上下游的概念,比如下图:

数据从发源地开始,可以经过多个中转站进行处理。

上游操作符

既然Flow是数据流,那流中的数据是如何产生的呢?这就需要上游操作符,我们直接看代码:

fun main() = runBlocking {
    //上游,发源地
    flow {  
        //挂起函数,emit是挂起函数
        emit(1)              
        emit(2)
        emit(3)
        emit(4)
        emit(5)
    }.filter { it > 2 }     // 中转站1
        .map { it * 2 }     // 中转站2
        .take(2)            // 中转站3
        .collect{           // 下游
            println(it)
        }
}

上面代码中就创建了一个Flow,然后往Flow中发送了5个数据,再通过中间3个中转站对数据进行了处理,最后通过collect高阶函数进行收集,上面代码的运行结果如下:

这里完全符合预期,而且这种链式调用非常符合阅读习惯。

flow{}创建Flow

我们这里先重点看一下这里的flow{}高阶函数,它的作用是创建一个新的Flow,同时在它的lambda中我们使用emit()挂起函数往这个Flow发送数据,它是一个上游操作符。

所以上游操作符的作用是创建一个Flow,然后负责往Flow中发送数据;类比于现实中,河流的水也是从上游产生的一样,所以上游操作符不仅要创建Flow,还负责发送数据。

我们可以简单看一下flow{}函数定义:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> 
    = SafeFlow(block)

首先它是一个顶层函数,然后block是函数类型,而且它的接收者是FlowCollector类型,根据简写约定,block就相当于是FlowCollector的成员函数,所以它可以调用FlowCollector类型中的方法、变量,该接口:

public fun interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}

这里我们可以发现我们可以在block中调用emit也就知道原因了。

这里还有一个疑惑,flow方法不是扩展函数,但是block中可以调用emit方法,这个emit方法是哪个对象的呢?

我们简单来看一下其实现SafeFlow:

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

flow{}函数会返回一个SafeFlow对象,然后在其实现方法collectSafely中会有一个FlowCollector类型的对象,然后可以利用该对象调用block,即collector.block(),这里也进一步说明了前面的约定观点。

那这个collector对象是什么赋值的呢?具体原理研究,等后面原理篇文章再细说。

flowOfasFlow创建Flow

除了上面方法外,还有类似于集和API的arrayListOf等的flowOf函数,以及asFlow把集和转换为Flow,代码如下:

fun main() = runBlocking {
    listOf(1,2,3,4,5).asFlow()
        .filter { it > 2 }     // 中转站1
        .map { it * 2 }        // 中转站2
        .take(2)               // 中转站3
        .collect{              // 下游
            println(it)
        }

    flowOf(1,2,3,4,5)
        .filter { it > 2 }     // 中转站1
        .map { it * 2 }        // 中转站2
        .take(2)               // 中转站3
        .collect{              // 下游
            println(it)
        }
}

这2种方法也可以创建Flow并且往里发送数据。

我们来简单看一下asFlowflowOf:

public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
    for (element in elements) {
        emit(element)
    }
}

可以发现这2个方法都是调用flow{}高阶函数函数来实现的,并且都是遍历调用emit方法来完成发送数据,所以核心还是这个emit方法,后面文章再说其原理。

所以这里上游操作符的创建Flow的方式有3种:

Flow创建方式使用场景用法
flow{}未知的数据集,用emit发送数据flow { emit(1)}
flowOf()已知数据,类似于集合的arrayOfflowOf(1,2,3)
asFlow()已知数据集合list.asFlow()

我们在日常项目中需要根据具体情况来使用。

中间操作符

当数据被发送到Flow中,就可以使用中间操作符来对流中的数据进行处理,由上面例子代码中我们可以 发现比如filtermap这些类似的API其实就是从集和那边抄来的,这些操作符我相信看名字就能够识别和使用。所以这里重点说一些和集和无关的API。

Flow生命周期回调函数

Flow的中间操作符中有2个比较特殊的操作符,甚至他们有点不像是中间操作符,就是onStartonCompletion,其实是Flow开始和完成的回调函数,我们来看个例子:

fun main() = runBlocking {
    flow{
        println("发射 1")
        emit(1)
        println("发射 2")
        emit(2)
        println("发射 3")
        emit(3)
        println("发射 4")
        emit(4)
        println("发射 5")
        emit(5)
    }
        .filter {
            println("filter $it")
            it > 2
        }
        .map {
            println("map $it")
            it * 2
        }
        .take(2)
        .onStart {
            println("start")
        }
        .onCompletion {
            println("onCompletion")
        }
        .collect{
            println("collect $it")
        }
}

这里逻辑就不说了,其中onStart就是Flow开始的回调,而onCompletion就是Flow结束的回调,通过这个我们可以看出Flow的运行状态,同时这2个操作符和位置是无关的,虽然onStart已经放在了发射和几个操作数据的操作符后面,但是在执行时他却是最先执行的,下面是运行结果:

其他操作数据的操作符是和所在的位置有关的,这个也非常容易理解,比如我们把take(2)放到.filter前,打印结果如下:

这里结果就会发生变化,也非常好理解,因为Flow是数据流,中间的操作会影响后的。所以这也是为什么感觉onStartonCompletion不太像中间操作符就是这个原因。

而这里onCompletion回调触发的情况有下面3种情况:

  • Flow正常执行完毕;
  • Flow当中出现异常;
  • Flow被取消。

其实这些都挺好理解的,在介绍协程的句柄Job那一章我们说过类似的逻辑。

这里我们还是简单看看onStart方法:

public fun <T> Flow<T>.onStart(
    action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = unsafeFlow {
    val safeCollector = SafeCollector<T>(this, currentCoroutineContext())
    try {
        safeCollector.action()
    } finally {
        safeCollector.releaseIntercepted()
    }
    collect(this) // directly delegate
}

首先,调用onStart方法会返回一个新的Flow,其次action函数类型接收者是FlowCollector,所以可以在onStartlambda中调用emit方法,并且会在上游发送数据前先执行,比如如下代码

flowOf("a", "b", "c")
    .onStart { emit("Begin") }
    .collect { println(it) }

这里会打印Begin,a,b,c,也会说明onStart方法的执行优先级非常高,和位置无关。

这里衍生一点,就是SharedFlow,因为Flow是冷的,而SharedFlow是热的,所以当onStartSharedFlow一起使用时,就无法保证onStart一定会在发送数据之前执行,这时可以使用onSubscription其他API来完成,关于这点,后面说SharedFlow再细说。

接着来看看onCompletion:

public fun <T> Flow<T>.onCompletion(
    action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T> = unsafeFlow { 
    try {
        collect(this)
    } catch (e: Throwable) {
        ThrowingCollector(e).invokeSafely(action, e)
        throw e
    }
    // Normal completion
    val sc = SafeCollector(this, currentCoroutineContext())
    try {
        sc.action(null)
    } finally {
        sc.releaseIntercepted()
    }
}

该方法会在flow正常完成、被取消后执行,并且运行给定的action。这里我们需要注意一点,就是action的函数类型是(Throwable?) -> Unit,即当程序发生异常时,或者调用cancel()会产生一个CancellationException时,这个异常信息会被传递到onCompletionaction中。

和下面要说的catch不同,这个操作符会抛出上游和下游的异常,和位置关系是不固定的,如果Flow在执行过程中没有异常,则cause会是null

还有一个神奇操作,就是action的接收者类型是FlowCollector,所以在action中依旧可以调用emit发送数据,由上面源码可知,在onCompletionemit的数据,会在最后被收集。

异常处理函数

在前面我们知道Flow就是数据流,它有3个部分组成:上游、中间操作和下游,那么当Flow中发生异常时,也可以根据这个标准来进行分类,也就是异常发生的位置。

catch捕获异常

Flow的异常发生在上游或者中间时,可以使用catch进行捕获异常,注意catch这个操作符就是和我们平时使用的try-catch意义用来捕获异常,只不过这个是用在Flow中的。

同时catch操作符的作用和它的位置是强相关的,比如下面代码:

fun main() = runBlocking {
    flow{
        emit(1)
        emit(2)
        throw IllegalStateException()
        emit(3)
    }.map { it * 2 }
        .catch { println("catch exception: $it") }
        .collect{
            println("collect : $it")
        }

}

这里在上游操作符中抛出一个异常,这里就会被捕获到,同时会终止Flow的继续发射和执行,所以打印如下:

注意这里catch只能捕获发生在它上游的异常,当异常发生时不再继续发生数据和执行数据。

try-catch捕获下游异常

这里说了catch是一个中间操作符,可以捕获在它之前的异常,那对于下游操作符的异常呢 比如在collect高级函数中发生异常,这里最简单的办法就是直接使用try-catch即可,比如下面代码:

fun main() = runBlocking {
    flow{
        emit(1)
        emit(2)
        throw IllegalStateException()
        emit(3)
    }.map { it * 2 }
        .catch { println("catch exception: $it") }
        .collect{
            try {
                println("collect : $it")
                throw IllegalStateException()
            }catch (e: Exception){
                println("Catch : $e")
            }
        }
}

这里直接就在下游操作符中使用普通的try-catch进行捕获异常。

关于异常捕获,我们后面有文章单独介绍,这里我们只需要知道Flow提供的catch中间操作符是和位置有关的,只能捕获它上游的异常,想捕获终止操作符中的异常,还是得老老实实使用try-catch

切换Context

前面说了Flow在复杂业务上可以取代RxJava,而复杂的业务经常需要频繁地切换工作的线程,对于耗时任务,我们需要在线程池中执行,对于UI任务,我们需要在主线程上执行,而在Flow当中,我们可以借助flowOn这个中间操作符便可以完成需求。

flowOn切换上游的上下文

还是看个示例代码,如下:

fun main() = runBlocking {
    flow {
        logX("Start")
        emit(1)
        logX("Emit 1")
        emit(2)
        logX("Emit 2")
        emit(3)
        logX("Emit 3")
    }.filter {
        logX("Filter: $it")
        it > 2
    }.flowOn(Dispatchers.IO)
        .collect{
            logX("Collect $it")
        }
}

和前面的catch操作符的作用域一样,它的作用域只对它的上游有作用,上面代码的运行结果如下:

================================
Start
Thread:DefaultDispatcher-worker-1 @coroutine#2
================================
================================
Filter: 1
Thread:DefaultDispatcher-worker-1 @coroutine#2
================================
================================
Emit 1
Thread:DefaultDispatcher-worker-1 @coroutine#2
================================
================================
Filter: 2
Thread:DefaultDispatcher-worker-1 @coroutine#2
================================
================================
Emit 2
Thread:DefaultDispatcher-worker-1 @coroutine#2
================================
================================
Filter: 3
Thread:DefaultDispatcher-worker-1 @coroutine#2
================================
================================
Emit 3
Thread:DefaultDispatcher-worker-1 @coroutine#2
================================
================================
Collect 3
Thread:main @coroutine#1
================================

Process finished with exit code 0

可以发现在Flow中,emitfilter所在的协程2运行在子线程中,而collect运行在主线程中,这也就印证了前面所说的作用域问题。

launchIn指定CoroutineScope

前面和catch操作符一样的问题,它的作用域范围只是它前面的部分,同时假如多次调用flowOn则当前flowOn的范围是到上一个flowOn,这个也非常好理解。

那下面就来解决如何指定下游操作符的运行线程,或者更直接的就是flowOn后面的中间操作符和终止操作符所运行的线程,Kotlin这里提供了一个叫做launchIn的操作符,它可以把一部分操作指定Scope,示例代码如下:

//新建一个Dispatcher
val mySingleDispatcher = Executors.newSingleThreadExecutor {
    Thread(it, "MySingleThread").apply {
        isDaemon = true
    }
}.asCoroutineDispatcher()

fun main() = runBlocking {
    //创建一个CoroutineScope
    val scope = CoroutineScope(mySingleDispatcher)
    flow {
        logX("Start")
        emit(1)
        logX("Emit 1")
        emit(2)
        logX("Emit 2")
        emit(3)
        logX("Emit 3")
    }.flowOn(Dispatchers.IO)
        .filter {
        logX("Filter: $it")
        it > 2
    }.onEach {
            logX("onEach $it")
        }
        //指定运行在哪个协程范围内
        .launchIn(scope)

    delay(1000)
}

这里的代码比较特殊,我们可以肯定地是发射数据地代码肯定是在IO线程中,而filteronEach中的代码运行在什么地方呢?

通过运行我们会发现这个俩部分代码会运行在我们自定义的线程池中:

================================
Start
Thread:DefaultDispatcher-worker-1 @coroutine#3
================================
================================
Emit 1
Thread:DefaultDispatcher-worker-1 @coroutine#3
================================
================================
Emit 2
Thread:DefaultDispatcher-worker-1 @coroutine#3
================================
================================
Emit 3
Thread:DefaultDispatcher-worker-1 @coroutine#3
================================
================================
Filter: 1
Thread:MySingleThread @coroutine#2
================================
================================
Filter: 2
Thread:MySingleThread @coroutine#2
================================
================================
Filter: 3
Thread:MySingleThread @coroutine#2
================================
================================
onEach 3
Thread:MySingleThread @coroutine#2
================================

Process finished with exit code 0

这里的疑点我们来慢慢解答:

  • onEach操作符是什么东西,就是返回一个新的Flow,但是把每个上游的值都在其高级函数中执行一遍,比如这里就是println一下,然后再emit到新的Flow中,源码如下:
public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
    action(value)
    return@transform emit(value)
}

可以看出,这里出现了一个新的Flow

  • launchIn操作又是啥呢 为什么没有终止操作符,这里的代码依旧可以运行,我们来看一下源码:
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
    collect() // tail-call
}

会发现launchInFlow的扩展函数,而且直接在scope调用launch启动了一个新协程,而协程中调用了collect(),这也就说明了为什么这里没有终止操作符的原因。

所以严格意义上说launchIn算是一个终止操作符,把它上游的代码都分发到指定的线程当中。

同时这个launchIn操作符在源码中有特殊的使用说明,代码注释如下:

This operator is usually used with onEach, 
onCompletion and catch operators to process all emitted values handle an exception that might occur in the upstream 
flow or during processing, 
for example:
flow
    .onEach { value -> updateUi(value) }
    .onCompletion { cause -> updateUi(if (cause == null) "Done" else "Failed") }
    .catch { cause -> LOG.error("Exception: $cause") }
    .launchIn(uiScope)

即这个操作符经常和onEachonCompletioncatch一起使用,处理所有发出的值,处理可能在上游或者处理过程中发生的异常。

终止操作符

最后,我们到了Flow的下游部分,这里可以使用终止操作符来终止整个Flow,当使用了终止操作符,我们再也无法使用map这种中间操作符了。

最常用的就是collect终止操作符,代表数据流的终止。同时还有一些从集合中抄过来的操作符,比如first()single()foldreduce等等。

另外把Flow转换为集合的时候,本身也意味着Flow数据流的终止,比如toList,也算是终止操作符。

Flow是冷的

上一篇文章我们说了Channel即管道这个是热的,特点就像是热情的服务员,不管你想不想喝水,都会给你端茶递水,那这里的Flow其实也非常好理解,我们看个代码:

fun main() = runBlocking {
    // 冷数据流
    val flow = flow {
        (1..3).forEach {
            println("Before send $it")
            emit(it)
            println("Send $it")
        }
    }

    // 热数据流
    val channel = produce<Int>(capacity = 0) {
        (1..3).forEach {
            println("Before send $it")
            send(it)
            println("Send $it")
        }
    }

    println("end")
}

上面代码的打印:

可以发现:Channel之所以认为是热的,是因为不管有没有接收方,发送方都会工作;而Flow冷的原因是,只有调用终止操作符以后,Flow才会工作。

Flow还是懒的

默认情况下,Flow不仅是冷的,还是懒的,比如下面代码:

fun main() = runBlocking {
    flow {
        println("emit: 3")
        emit(3)
        println("emit: 4")
        emit(4)
        println("emit: 5")
        emit(5)
    }.filter {
        println("filter: $it")
        it > 2
    }.map {
        println("map: $it")
        it * 2
    }.collect {
        println("collect: $it")
    }
}

这里的执行结果如下:

image.png

会发现它一次只处理一个数据,结合上一篇文章的服务员端茶倒水的例子,就是Flow不仅是一个冷淡的服务员,还是一个懒惰的服务员,明明饭桌上有3个人需要喝水,但服务员偏偏不一次递上3杯水,而是每个人都叫服务员一次,服务员才一杯一杯地把水递过来。

总结

这篇文章内容很多,也是Kotlin的一个重点,Flow在Android中有很多地方使用,等后面说一下Flow的原理,再来讨论在Android中如何使用Flow

总体来说,FlowChannel一样,都是用来协程间的通信的,可以让我们更方便的处理复杂逻辑。

整个Flow的API设计可以分为上游、中间操作符和下游操作符:

  • 上游:主要负责创建Flow,同时产生数据,主要有3个API:flow{}flowOf()asFlow();
  • 中间操作符:可以分为4类,第一类是集合抄过来的操作符;第二类是生命周期回调,比如onStartonCompletion;第三类是捕获异常catch操作符;第四类是切换context,比如flowOnlaunchIn
  • 下游终止操作符:可以分为3类,第一个就是collect;第二类就是集合抄过来的;第三类就是Flow转集合的API,比如flow.toList()

最后就是和Channel对比,说Flow为什么是冷的,以及优势和劣势。

image.png


阅读量:939

点赞量:0

收藏量:0