这些flow常见API的使用,你一定需要掌握!(二)-灵析社区

德州安卓

flowOn()指定调度线程

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
    checkFlowContext(context)
    return when {
        context == EmptyCoroutineContext -> this
        this is FusibleFlow -> fuse(context = context)
        else -> ChannelFlowOperatorImpl(this, context = context)
    }
}

传入的参数类型为一个协程上下文元素之一的分发器,指定flowOn上面的代码块执行所在的线程环境:

即指定上面红框中代码块执行的线程环境,不包括下面,看下日志打印:

flattenConcat串行展平流

public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
    collect { value -> emitAll(value) }
}

展平流,也就是将Flow<Flow<T>>类型的嵌套流展平成Flow<T>类型的流,请注意这个flattenConcat()方法属于串行的展平流,也就是说,只有执行完上一个流之后,才回去执行下一个流中的flow代码块逻辑。

fun flat() {
    GlobalScope.launch {
        flowOf(
            flow {
                println("flow1")
                delay(1000)
                emit(10)
            },
            flow {
                println("flow2")
                emit(20)
            }
        ).flattenConcat().collect {
            println("flat collect: $it")
        }
    }
}

输出结果:

可以看到,即使第一个flow中挂起了1s,也不会执行第二个flow中的逻辑,只有第一个flow执行完毕才会轮到第二个flow开始执行。

还有个flatMapConcat()方法,展平的时候可以变换发送的数据类型,这里不做过多介绍,就是map()flattenConcat的结合。

flattenMerge并行展平流

public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
    require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
    return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
}

flattenConcat()的区别就是,要展平的流之前是并行执行的:

fun flat() {
    GlobalScope.launch {
        flowOf(
            flow {
                println("flow1")
                delay(1000)
                emit(10)
            },
            flow {
                println("flow2")
                delay(2000)
                emit(20)
            },
            flow {
                println("flow3")
                emit(30)
            }
        ).flattenMerge().collect {
            println("flat collect: $it")
        }
    }
}

看下输出结果:

当flow1和flow2调用delay挂起就去执行flow3,几个流之间相互都是并行执行的。不过flattenMerge方法有个参数concurrency执行并行执行的流的个数,默认16个:

如果我们将上面的flattenMerge(2)传入2看下执行效果:

可以看到,有限flow1和flow2代码块中的逻辑处理完再执行flow3的代码块逻辑。

还有个flatMapMerge()方法,展平的时候可以变换发送的数据类型,这里不做过多介绍,就是map()和flattenMerge的结合。

buffer()背压支持

public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {
    // create a flow
    return when (this) {
        is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow)
        else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow)
    }
}

这个扩展函数主要是解决上游流的发送速度超过下游流的处理速度的场景,有点类似与rxjava中的背压。并且提供了三种策略:挂起丢弃最新丢弃最久:

fun flat() {
    GlobalScope.launch {
        flow {
            println("flow2")
            for (i in 0..100) {
                emit(i)
            }
        }.buffer(1, BufferOverflow.DROP_OLDEST).collect {
            println("flat collect: $it")
        }
    }
}

指的BufferOverflow.DROP_OLDEST丢弃最旧模式的运行结果:

阅读量:405

点赞量:0

收藏量:0