在前面介绍中,我们介绍了几种启动协程的方法,以及挂起函数,在使用async
启动的协程中,我们可以通过await
方法来获取协程的返回值。
但是这些返回值都是只有一个,即当挂起的函数或者协程,在挂起后重新恢复时,只返回了一个结果。在普通业务中,这种模式是可行的,但是有些特殊业务,比如:前台应用不断更新手机GPS
返回的经纬度,如果使用回调的话,我们可以非常容易实现,但是想用协程以同步方式优雅地实现,就需要引入新地东西,即本章的Channel
可以解决。
Channel
直接翻译就是管道,我们可以利用这个概念来简单构建一个思维模型:
通过这个思维模型,我们可以把Channel
看成是一个封闭的管道,它只有发送方和接收方,数据从发送方发出可以被接收方收到,中间不会被修改。
我们先来看看Channel
的简单使用,实例代码如下:
fun main() = runBlocking {
//创建一个管道,可以传递Int类型的值
val channel = Channel<Int>()
//开启一个协程
launch {
//在该协程中发送消息
channel.send(1)
logX("Send 1")
channel.send(2)
logX("Send 2")
channel.send(3)
logX("Send 3")
}
//开启一个协程
launch {
//在另一个协程中接收管道消息
//这里之所以可以写出for循环的代码,这里也是根据Kotlin的约定,即运算符重载,重载了iterator(),
//这个在集合篇中有详细说过
for (i in channel){
logX("Receive: $i")
}
}
logX("end")
}
在之前学习中,我们说过launch
适合启动一些一劳永逸、不需要返回结果的任务协程,但是在这里,我们获取协程里面的代码执行结果而不必那么费劲了,必须使用async
了,这里的Channel
就像是一个管道,管道的俩头分别在俩个协程中,即Channel
可以轻松实现协程间的通信。
在上面代码中,首先就是接收数据的协程中,使用for循环来获取管道的值,这个是运算符重载的简写。其次,俩个协程都是运行在runBlocking
启动的协程中,特点是会阻塞线程且会等待其子协程运行完成。
我们来看一下上面代码运行结果:
这里我们可以发现end
先打印,这时因为launch
启动的协程就如射箭,它们不会阻塞父协程代码执行。然后就是Receive
和Send
的打印交替执行,甚至还出现了Receive 1
比Send 1
先打印(打印的代码会被挂起),这是因为这俩个协程是交替执行的,send
发送数据函数和接收数据函数是挂起函数。
挂起函数的本质是Callback
,且挂起的是协程后面的代码,在上面代码默认情况下:调用完channel.send(1)
,发送数据的协程就会挂起,等待接收协程把管道中的数据取走后,再恢复打印log
和调用channel.send(2)
。
理解了挂起函数后,我相信上面利用挂起函数来实现的交替执行都不难理解了。
但是从打印中也会发现一个问题,就是程序在输出完所有结果后并没有退出,即主线程不会结束,整个程序还会处于运行状态。
Channel
出现这个原因是因为Channel
无法知道数据有没有发送完,依旧会挂起等待,想解决上面问题也非常简单,发送完关闭这个Channel
即可:
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
(1 .. 3).forEach{
channel.send(it)
logX("Send $it")
}
//这里加一句代码,发送完数据,及时关闭channel
channel.close()
}
launch {
for (i in channel){
logX("Receive: $i")
}
}
logX("end")
}
所以从这里可以看出Channel
其实是一种协程资源,在使用完Channel
以后,如果不主动关闭的话,会造成不必要的资源浪费。
现在我们来分析一下Channel
的“构造函数”,来看看如何修改参数来达到不同的效果。代码如下:
public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>
可以看到当调用Channel()
时看着像是在调用一个构造函数,其实它却是一个顶层函数,注意这里使用比较特殊,是把顶层函数当作构造函数来用,在这种情况下,函数首字母需要大写。
这个函数带有一个泛型参数E
,另外还有3个参数,我们一一来看一下这3个参数。
这个代表管道的容量,这个非常好理解,就比如我们生活中见到的管道也是有容量的,当接收方不把数据取走时,管道本身也可以保存一些数据。
默认情况下是RENDEZVOUS
,表示Channel
的容量是0。capacity
还有其他几种情况,分别如下:
UNLIMITED
:代表无限容量;CONFLATED
:代表容量为1,新的数据会替代旧的数据;BUFFRED
:代表一定缓存容量,默认是64;这里的默认值RENDEZOUS
,很有意思,它的翻译是约会、会面的意思。
这个是当指定了capacity
的容量,等Channel
的容量满了之后,Channel
所应对的策略,这里主要有3种做法:
SUSPEND
:当管道的容量满了以后,如果发送方继续发送数据,我们会挂起当前的send()
方法。由于它是一个挂起函数,所以我们可以非阻塞的方式将发送方的流程挂起,等管道容量有空闲位置以后再恢复。这个逻辑非常好理解,就和Java实现的阻塞队列一样。DROP_OLDEST
:顾名思义,就是丢弃掉最旧的那个数据;DROP_LATEST
丢掉最新的数据,这里指还没有进入管道的数据;这里有一张图,可以更好地表达上面几种做法:
这个相当于一个异常处理回调,当管道种某些数据没有被成功接收的时候,这个回调就会被调用。这里其实也需要注意,是数据已经发送了,但是没有被接收,才会触发回调;而不是管道满了,把数据丢了,这种情况不会触发回调。
这种只有非常在意数据是否传输正常的业务,才会用到这个回调。
说完上面各个参数的作用,我们来看一些例子。
capacity = UNLIMITED
,即容量无限,代码如下:fun main() = runBlocking {
//创建管道,容量设置为无限容量
val channel = Channel<Int>(capacity = Channel.Factory.UNLIMITED)
launch {
//在一个单独地协程中发送管道消息
channel.send(1)
logX("Send 1")
channel.send(2)
logX("Send 2")
channel.send(3)
logX("Send 3")
}
launch {
//在另一个协程中接收管道消息
for (i in channel){
logX("Receive: $i")
}
}
logX("end")
}
这里的代码只改变一点,就是容量无限大,然后onBufferOverflow
策略不变,我们可以想一下,第一个子协程在运行时,会发现管道容量很大,所以3个send()
方法执行完后,管道还没有满,协程不会被挂起,子协程1执行完成。
然后协程2开始遍历取出channel
中的数据,所以结果如下:
是先连续发送3个,再挨个接收3个。
capacity = CONFLATED
,即容量为1,但是根据定义,新的数据会替代旧的数据,这个还是要注意的,这个和你自己设置容量为1的情况大不相同,比如下面代码是设置capacity = CONFLATED
的代码和运行结果://创建管道,容量为 CONFLATED
val channel = Channel<Int>(capacity = Channel.Factory.CONFLATED)
代码运行结果如下:
这里会发现即使channel
已经满了,但是依旧会发送数据,而不是挂起,所以会发现3个数据,接收方只接收了一个最新值。
下面代码仅仅是设置capacity = 1
的情况:
//创建管道
val channel = Channel<Int>(capacity = 1)
代码运行结果如下:
这里就符合正常的管道容量为1且默认满的时候是挂起的逻辑了。
所以当设置为CONFLATED
时,onBufferOverflow
的挂起将会失效,会替换新值。
onBufferOverflow
策略,当使用DROP_OLDEST
策略即丢弃最老值和同时设置capacity
值为1时,就实现了capacity
为CONFLATED
一样的效果:val channel = Channel<Int>(capacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST)
这里我们可以想一下,为什么capacity
需要搞一个CONFLATED
这个模式呢
因为在我们的业务中,经常会出现一个场景就是我只需要最新的值;比如Android中的页面UI数据,在页面在后台时,我们就可以把无用数据丢弃,只需要最新值即可。所以CONFLATED
非常合理。
onBufferOverflow
使用DROP_LATEST
策略,这里注意使用这个策略,就不会再出现发送方挂起的情况了,所以我们在日常使用时尽量多使用SUSPEND
默认策略,下面是使用DROP_LATEST
策略:fun main() = runBlocking {
//创建管道,容量为3,当管道满的时候,会丢弃最新的
val channel = Channel<Int>(capacity = 3,
onBufferOverflow = BufferOverflow.DROP_LATEST)
launch {
//在一个单独地协程中发送管道消息
channel.send(1)
logX("Send 1")
channel.send(2)
logX("Send 2")
channel.send(3)
logX("Send 3")
channel.send(4) //会被丢弃
logX("Send 4")
}
launch {
//在另一个协程中接收管道消息
for (i in channel) {
logX("Receive: $i")
}
}
logX("end")
}
比如上面代码容量为3的管道,发送了4个数据,数据4会被丢弃,结果打印如下:
在这种情况下,就会发现这个管道只是用来连接2个协程通信而已,不会有挂起、恢复等操作,在实际业务中,这种使用场景可能不会太多。
onUndeliveredElement
这个参数,就是当消息没有传递成功的回调,比如下面代码://创建管道,容量为3,背压策略是丢弃最新的,未传递策略是打印
val channel = Channel<Int>(capacity = 3,
onBufferOverflow = BufferOverflow.DROP_LATEST){
println("传递失败 $it")
}
launch {
//在一个单独地协程中发送管道消息
channel.send(1)
logX("Send 1")
channel.send(2)
logX("Send 2")
channel.send(3)
logX("Send 3")
channel.send(4)
logX("Send 4")
}
launch {
//在另一个协程中接收管道消息
channel.receive()
channel.cancel()
}
我们在一个子协程上发送了4个数据,但是管道容量为3,所以数据4直接会被丢弃,这个不算是传递失败,然后在另一个子协程中就receive()
了一次,然后取消管道,这就导致有2个会传递失败,运行打印如下:
Channel
关闭引发的问题在前面代码中我们一直有个问题没有解决,就是使用channel
,假如忘记调用close()
,会导致程序一直无法终止。
produce{}
生产数据Kotlin的官方也想到了这个问题,所以提供了另一种创建Channel
的方式,即produce{}
函数。
fun main() = runBlocking {
//创建管道
val channel = produce {
(1 .. 3).forEach {
send(it)
logX("Send $it")
}
}
launch {
//在另一个协程中接收管道消息
for (i in channel){
logX("Receive $i")
}
}
logX("end")
}
这里使用produce
高阶函数创建了一个Channel
,并且发送了3个数据,然后该代码执行结果如下:
可以看出程序可以正常退出。
receive()
前面我们从Channel
中取出数据用的是for
循环遍历的方式,其实还有一个方法:receive()
,它是和send()
方法对应的,比如上面代码我们把for
循环遍历取出数据的方式改成receive()
,并且一次性调用4次:
launch {
//在另一个协程中接收管道消息
channel.receive()
channel.receive()
channel.receive()
channel.receive()
}
这里由于我们发了3个数据,因为produce特性发完3个后会关闭Channel
,所以前面代码报出异常:
抛出了这个异常,也就说明Channel
确实被关闭了。
同时我们发现receive()
是一个挂起函数,它的定义是当Channel
不为空时取出一个数据,而当Channel
为空时,会挂起,所以直接使用receive()
函数时容易会导致程序永久挂起。
isClosedForReceiv
e和isClosedForSend
那我就想使用receive()
函数呢 这里有2个函数isClosedForReceive
和isClosedForSend
可以判断在发送时和接收时Channel
是否关闭。比如下面代码:
fun main() = runBlocking {
//创建管道
val channel = produce {
(1 .. 3).forEach {
send(it)
logX("Send $it")
}
}
launch {
//在另一个协程中接收管道消息
while (!channel.isClosedForReceive){
println("Receive ${channel.receive()}")
}
}
logX("end")
}
上面代码看起来天衣无缝,在接收协程中,循环判断Channel
是否关闭,但是结果运行如下:
会发现还是会崩溃,所以最好不要使用channel.receive()
,即使配合isClosedForReceive()
也不要使用。
consumeEach{}
消费数据那有没有除了for循环其他的方式呢,这里Kotlin为我们提供了一个高阶函数:channel.consumeEach{}
,我们来看下面的例子:
fun main() = runBlocking {
//创建管道,使用produce生产数据
val channel = produce {
(1 .. 3).forEach {
send(it)
logX("Send $it")
}
}
launch {
//使用consumeEach消费数据
channel.consumeEach {
print("Receive $it")
}
}
logX("end")
}
上面代码就不会出现异常。
综上所述,当使用Channel
时,我们建议发送方使用produce{}高阶函数,接收方使用for循环或者consumeEach{}高阶函数
。
前面介绍我们知道Channel
就是一个管道,而管道中是数据流,是多个数据组合形成流。如果把挂起函数、async
返回的数据比喻成水滴,那channel
则像是自来水管。
在Kotlin中我们经常拿Channel
和Flow
做比较,而说Channel
是"热"的,那这个"热"该如何理解呢?
其实我觉得可以直接翻译为热情、主动的意思,比如我们可以下面代码:
fun main() = runBlocking {
//创建管道
val channel = produce(capacity = 10) {
(1 .. 3).forEach {
send(it)
logX("Send $it")
}
}
logX("end")
}
我们定义了一个容量为10的管道,然后发送数据,但是我们没有接收数据,这里的运行结果如下:
依旧会让管道中发送消息,而这种不管有没有接收方,发送方都会工作"的模式,我们就认为是"热"的原因。
这里就可以类比为热情的饭店服务员,不管你有没有提出要求,服务员都会给你端茶递水,把茶水摆在饭桌上,想喝水的时候,直接从桌上拿即可。
也可以类比为前面说的水龙头,Channel
的发送方就好比是自来水厂,不管用不用水,自来水厂都会把水送到管道中来,当想喝水的时候,打开水龙头就能喝到水。
Channel
的出现给我们在协程之间通信提供了便利性,这里我们把Channel
看成一条密闭的管道,通过produce{}
和consumeEach{}
来进行产生和消费数据。
同时Channel
的默认挂起策略,可以让我们更方便的处理数据。最后就是相比于Flow
,Channel
是热的,也就是不论消费者是否工作,它都会发送数据,会导致资源浪费。
阅读量:1452
点赞量:0
收藏量:0