flowOn()
指定调度线程public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
checkFlowContext(context)
return when {
context == EmptyCoroutineContext -> this
this is FusibleFlow -> fuse(context = context)
else -> ChannelFlowOperatorImpl(this, context = context)
}
}
传入的参数类型为一个协程上下文元素之一的分发器,指定flowOn上面的代码块执行所在的线程环境:
即指定上面红框中代码块执行的线程环境,不包括下面,看下日志打印:
flattenConcat
串行展平流public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
collect { value -> emitAll(value) }
}
展平流,也就是将Flow<Flow<T>>类型的嵌套流展平成Flow<T>类型的流,请注意这个flattenConcat()方法属于串行的展平流,也就是说,只有执行完上一个流之后,才回去执行下一个流中的flow代码块逻辑。
fun flat() {
GlobalScope.launch {
flowOf(
flow {
println("flow1")
delay(1000)
emit(10)
},
flow {
println("flow2")
emit(20)
}
).flattenConcat().collect {
println("flat collect: $it")
}
}
}
输出结果:
可以看到,即使第一个flow中挂起了1s,也不会执行第二个flow中的逻辑,只有第一个flow执行完毕才会轮到第二个flow开始执行。
还有个flatMapConcat()方法,展平的时候可以变换发送的数据类型,这里不做过多介绍,就是map()和flattenConcat的结合。
flattenMerge
并行展平流public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
}
和flattenConcat()的区别就是,要展平的流之前是并行执行的:
fun flat() {
GlobalScope.launch {
flowOf(
flow {
println("flow1")
delay(1000)
emit(10)
},
flow {
println("flow2")
delay(2000)
emit(20)
},
flow {
println("flow3")
emit(30)
}
).flattenMerge().collect {
println("flat collect: $it")
}
}
}
看下输出结果:
当flow1和flow2调用delay
挂起就去执行flow3,几个流之间相互都是并行执行的。不过flattenMerge
方法有个参数concurrency
执行并行执行的流的个数,默认16个:
如果我们将上面的flattenMerge(2)传入2看下执行效果:
可以看到,有限flow1和flow2代码块中的逻辑处理完再执行flow3的代码块逻辑。
还有个flatMapMerge()方法,展平的时候可以变换发送的数据类型,这里不做过多介绍,就是map()和flattenMerge的结合。
buffer()
背压支持public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {
// create a flow
return when (this) {
is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow)
else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow)
}
}
这个扩展函数主要是解决上游流的发送速度超过下游流的处理速度的场景,有点类似与rxjava中的背压。并且提供了三种策略:挂起、丢弃最新、丢弃最久:
fun flat() {
GlobalScope.launch {
flow {
println("flow2")
for (i in 0..100) {
emit(i)
}
}.buffer(1, BufferOverflow.DROP_OLDEST).collect {
println("flat collect: $it")
}
}
}
指的BufferOverflow.DROP_OLDEST丢弃最旧模式的运行结果:
阅读量:405
点赞量:0
收藏量:0