协程(8) | Channel-灵析社区

德州安卓

前言

在前面介绍中,我们介绍了几种启动协程的方法,以及挂起函数,在使用async启动的协程中,我们可以通过await方法来获取协程的返回值。

但是这些返回值都是只有一个,即当挂起的函数或者协程,在挂起后重新恢复时,只返回了一个结果。在普通业务中,这种模式是可行的,但是有些特殊业务,比如:前台应用不断更新手机GPS返回的经纬度,如果使用回调的话,我们可以非常容易实现,但是想用协程以同步方式优雅地实现,就需要引入新地东西,即本章的Channel可以解决。

正文

Channel直接翻译就是管道,我们可以利用这个概念来简单构建一个思维模型:

通过这个思维模型,我们可以把Channel看成是一个封闭的管道,它只有发送方和接收方,数据从发送方发出可以被接收方收到,中间不会被修改。

Channel的使用

我们先来看看Channel的简单使用,实例代码如下:

fun main() = runBlocking {
    //创建一个管道,可以传递Int类型的值
    val channel = Channel<Int>()
    //开启一个协程
    launch {
        //在该协程中发送消息
        channel.send(1)
        logX("Send 1")
        channel.send(2)
        logX("Send 2")
        channel.send(3)
        logX("Send 3")
    }
    //开启一个协程
    launch {
        //在另一个协程中接收管道消息
        //这里之所以可以写出for循环的代码,这里也是根据Kotlin的约定,即运算符重载,重载了iterator(),
        //这个在集合篇中有详细说过
        for (i in channel){
            logX("Receive: $i")
        }
    }
    logX("end")
}

在之前学习中,我们说过launch适合启动一些一劳永逸、不需要返回结果的任务协程,但是在这里,我们获取协程里面的代码执行结果而不必那么费劲了,必须使用async了,这里的Channel就像是一个管道,管道的俩头分别在俩个协程中,即Channel可以轻松实现协程间的通信。

在上面代码中,首先就是接收数据的协程中,使用for循环来获取管道的值,这个是运算符重载的简写。其次,俩个协程都是运行在runBlocking启动的协程中,特点是会阻塞线程且会等待其子协程运行完成。

我们来看一下上面代码运行结果:

这里我们可以发现end先打印,这时因为launch启动的协程就如射箭,它们不会阻塞父协程代码执行。然后就是ReceiveSend的打印交替执行,甚至还出现了Receive 1Send 1先打印(打印的代码会被挂起),这是因为这俩个协程是交替执行的,send发送数据函数和接收数据函数是挂起函数

挂起函数的本质是Callback,且挂起的是协程后面的代码,在上面代码默认情况下:调用完channel.send(1),发送数据的协程就会挂起,等待接收协程把管道中的数据取走后,再恢复打印log和调用channel.send(2)

理解了挂起函数后,我相信上面利用挂起函数来实现的交替执行都不难理解了。

但是从打印中也会发现一个问题,就是程序在输出完所有结果后并没有退出,即主线程不会结束,整个程序还会处于运行状态。

关闭Channel

出现这个原因是因为Channel无法知道数据有没有发送完,依旧会挂起等待,想解决上面问题也非常简单,发送完关闭这个Channel即可:

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        (1 .. 3).forEach{
            channel.send(it)
            logX("Send $it")
        }
        //这里加一句代码,发送完数据,及时关闭channel
        channel.close()
    }
    launch {
        for (i in channel){
            logX("Receive: $i")
        }
    }
    logX("end")
}

所以从这里可以看出Channel其实是一种协程资源,在使用完Channel以后,如果不主动关闭的话,会造成不必要的资源浪费。

Channel简析

现在我们来分析一下Channel的“构造函数”,来看看如何修改参数来达到不同的效果。代码如下:

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>

可以看到当调用Channel()时看着像是在调用一个构造函数,其实它却是一个顶层函数,注意这里使用比较特殊,是把顶层函数当作构造函数来用,在这种情况下,函数首字母需要大写

这个函数带有一个泛型参数E,另外还有3个参数,我们一一来看一下这3个参数。

capacity

这个代表管道的容量,这个非常好理解,就比如我们生活中见到的管道也是有容量的,当接收方不把数据取走时,管道本身也可以保存一些数据。

默认情况下是RENDEZVOUS,表示Channel的容量是0。capacity还有其他几种情况,分别如下:

  • UNLIMITED:代表无限容量;
  • CONFLATED:代表容量为1,新的数据会替代旧的数据
  • BUFFRED:代表一定缓存容量,默认是64;

这里的默认值RENDEZOUS,很有意思,它的翻译是约会、会面的意思。

onBufferOverflow

这个是当指定了capacity的容量,等Channel的容量满了之后,Channel所应对的策略,这里主要有3种做法:

  • SUSPEND:当管道的容量满了以后,如果发送方继续发送数据,我们会挂起当前的send()方法。由于它是一个挂起函数,所以我们可以非阻塞的方式将发送方的流程挂起,等管道容量有空闲位置以后再恢复。这个逻辑非常好理解,就和Java实现的阻塞队列一样。
  • DROP_OLDEST:顾名思义,就是丢弃掉最旧的那个数据;
  • DROP_LATEST丢掉最新的数据,这里指还没有进入管道的数据;

这里有一张图,可以更好地表达上面几种做法:

onUndeliveredElement

这个相当于一个异常处理回调,当管道种某些数据没有被成功接收的时候,这个回调就会被调用。这里其实也需要注意,是数据已经发送了,但是没有被接收,才会触发回调;而不是管道满了,把数据丢了,这种情况不会触发回调。

这种只有非常在意数据是否传输正常的业务,才会用到这个回调。

测试例子

说完上面各个参数的作用,我们来看一些例子。

  1. 设置容量capacity = UNLIMITED,即容量无限,代码如下:
fun main() = runBlocking {
    //创建管道,容量设置为无限容量
    val channel = Channel<Int>(capacity = Channel.Factory.UNLIMITED)

    launch {
        //在一个单独地协程中发送管道消息
        channel.send(1)
        logX("Send 1")
        channel.send(2)
        logX("Send 2")
        channel.send(3)
        logX("Send 3")
    }

    launch {
        //在另一个协程中接收管道消息
        for (i in channel){
            logX("Receive: $i")
        }
    }
    logX("end")
}

这里的代码只改变一点,就是容量无限大,然后onBufferOverflow策略不变,我们可以想一下,第一个子协程在运行时,会发现管道容量很大,所以3个send()方法执行完后,管道还没有满,协程不会被挂起,子协程1执行完成。

然后协程2开始遍历取出channel中的数据,所以结果如下:

是先连续发送3个,再挨个接收3个。

  1. 设置capacity = CONFLATED,即容量为1,但是根据定义,新的数据会替代旧的数据,这个还是要注意的,这个和你自己设置容量为1的情况大不相同,比如下面代码是设置capacity = CONFLATED的代码和运行结果:
//创建管道,容量为 CONFLATED
val channel = Channel<Int>(capacity = Channel.Factory.CONFLATED)

代码运行结果如下:

这里会发现即使channel已经满了,但是依旧会发送数据,而不是挂起,所以会发现3个数据,接收方只接收了一个最新值。

下面代码仅仅是设置capacity = 1的情况:

//创建管道
val channel = Channel<Int>(capacity = 1)

代码运行结果如下:

这里就符合正常的管道容量为1且默认满的时候是挂起的逻辑了。

所以当设置为CONFLATED时,onBufferOverflow的挂起将会失效,会替换新值。

  1. 这里我们结合onBufferOverflow策略,当使用DROP_OLDEST策略即丢弃最老值和同时设置capacity值为1时,就实现了capacityCONFLATED一样的效果:
val channel = Channel<Int>(capacity = 1, 
    onBufferOverflow = BufferOverflow.DROP_OLDEST)

这里我们可以想一下,为什么capacity需要搞一个CONFLATED这个模式呢

因为在我们的业务中,经常会出现一个场景就是我只需要最新的值;比如Android中的页面UI数据,在页面在后台时,我们就可以把无用数据丢弃,只需要最新值即可。所以CONFLATED非常合理。

  1. 再看一下onBufferOverflow使用DROP_LATEST策略,这里注意使用这个策略,就不会再出现发送方挂起的情况了,所以我们在日常使用时尽量多使用SUSPEND默认策略,下面是使用DROP_LATEST策略:
fun main() = runBlocking {
    //创建管道,容量为3,当管道满的时候,会丢弃最新的
    val channel = Channel<Int>(capacity = 3,
        onBufferOverflow = BufferOverflow.DROP_LATEST)

    launch {
        //在一个单独地协程中发送管道消息
        channel.send(1)
        logX("Send 1")
        channel.send(2)
        logX("Send 2")
        channel.send(3)
        logX("Send 3")
        channel.send(4) //会被丢弃
        logX("Send 4")
    }

    launch {
        //在另一个协程中接收管道消息
        for (i in channel) {
            logX("Receive: $i")
        }
    }
    logX("end")
}

比如上面代码容量为3的管道,发送了4个数据,数据4会被丢弃,结果打印如下:

在这种情况下,就会发现这个管道只是用来连接2个协程通信而已,不会有挂起、恢复等操作,在实际业务中,这种使用场景可能不会太多。

  1. 最后看看这个onUndeliveredElement这个参数,就是当消息没有传递成功的回调,比如下面代码:
//创建管道,容量为3,背压策略是丢弃最新的,未传递策略是打印
val channel = Channel<Int>(capacity = 3,
    onBufferOverflow = BufferOverflow.DROP_LATEST){
    println("传递失败 $it")
}

launch {
    //在一个单独地协程中发送管道消息
    channel.send(1)
    logX("Send 1")
    channel.send(2)
    logX("Send 2")
    channel.send(3)
    logX("Send 3")
    channel.send(4)
    logX("Send 4")
}

launch {
    //在另一个协程中接收管道消息
    channel.receive()
    channel.cancel()
}

我们在一个子协程上发送了4个数据,但是管道容量为3,所以数据4直接会被丢弃,这个不算是传递失败,然后在另一个子协程中就receive()了一次,然后取消管道,这就导致有2个会传递失败,运行打印如下:

Channel 关闭引发的问题

在前面代码中我们一直有个问题没有解决,就是使用channel,假如忘记调用close(),会导致程序一直无法终止。

produce{}生产数据

Kotlin的官方也想到了这个问题,所以提供了另一种创建Channel的方式,即produce{}函数。

fun main() = runBlocking {
    //创建管道
    val channel = produce {
        (1 .. 3).forEach {
            send(it)
            logX("Send $it")
        }
    }

    launch {
        //在另一个协程中接收管道消息
        for (i in channel){
            logX("Receive $i")
        }
    }
    logX("end")
}

这里使用produce高阶函数创建了一个Channel,并且发送了3个数据,然后该代码执行结果如下:

可以看出程序可以正常退出。

receive()

前面我们从Channel中取出数据用的是for循环遍历的方式,其实还有一个方法:receive(),它是和send()方法对应的,比如上面代码我们把for循环遍历取出数据的方式改成receive(),并且一次性调用4次:

launch {
    //在另一个协程中接收管道消息
    channel.receive()
    channel.receive()
    channel.receive()
    channel.receive()
}

这里由于我们发了3个数据,因为produce特性发完3个后会关闭Channel,所以前面代码报出异常:

抛出了这个异常,也就说明Channel确实被关闭了。

同时我们发现receive()是一个挂起函数,它的定义是当Channel不为空时取出一个数据,而当Channel为空时,会挂起,所以直接使用receive()函数时容易会导致程序永久挂起。

isClosedForReceive和isClosedForSend

那我就想使用receive()函数呢 这里有2个函数isClosedForReceiveisClosedForSend可以判断在发送时和接收时Channel是否关闭。比如下面代码:

fun main() = runBlocking {
    //创建管道
    val channel = produce {
        (1 .. 3).forEach {
            send(it)
            logX("Send $it")
        }
    }

    launch {
        //在另一个协程中接收管道消息
        while (!channel.isClosedForReceive){
            println("Receive ${channel.receive()}")
        }
    }
    logX("end")
}

上面代码看起来天衣无缝,在接收协程中,循环判断Channel是否关闭,但是结果运行如下:

会发现还是会崩溃,所以最好不要使用channel.receive(),即使配合isClosedForReceive()也不要使用。

consumeEach{}消费数据

那有没有除了for循环其他的方式呢,这里Kotlin为我们提供了一个高阶函数:channel.consumeEach{},我们来看下面的例子:

fun main() = runBlocking {
    //创建管道,使用produce生产数据
    val channel = produce {
        (1 .. 3).forEach {
            send(it)
            logX("Send $it")
        }
    }

    launch {
        //使用consumeEach消费数据
        channel.consumeEach {
            print("Receive $it")
        }
    }
    logX("end")
}

上面代码就不会出现异常。

综上所述,当使用Channel时,我们建议发送方使用produce{}高阶函数,接收方使用for循环或者consumeEach{}高阶函数

Channel是"热的"

前面介绍我们知道Channel就是一个管道,而管道中是数据流,是多个数据组合形成流。如果把挂起函数、async返回的数据比喻成水滴,那channel则像是自来水管。

在Kotlin中我们经常拿ChannelFlow做比较,而说Channel是"热"的,那这个"热"该如何理解呢?

热情、主动

其实我觉得可以直接翻译为热情、主动的意思,比如我们可以下面代码:

fun main() = runBlocking {
    //创建管道
    val channel = produce(capacity = 10) {
        (1 .. 3).forEach {
            send(it)
            logX("Send $it")
        }
    }

    logX("end")
}

我们定义了一个容量为10的管道,然后发送数据,但是我们没有接收数据,这里的运行结果如下:

依旧会让管道中发送消息,而这种不管有没有接收方,发送方都会工作"的模式,我们就认为是"热"的原因。

这里就可以类比为热情的饭店服务员,不管你有没有提出要求,服务员都会给你端茶递水,把茶水摆在饭桌上,想喝水的时候,直接从桌上拿即可。

也可以类比为前面说的水龙头Channel的发送方就好比是自来水厂,不管用不用水,自来水厂都会把水送到管道中来,当想喝水的时候,打开水龙头就能喝到水。

总结

Channel的出现给我们在协程之间通信提供了便利性,这里我们把Channel看成一条密闭的管道,通过produce{}consumeEach{}来进行产生和消费数据。

同时Channel的默认挂起策略,可以让我们更方便的处理数据。最后就是相比于FlowChannel是热的,也就是不论消费者是否工作,它都会发送数据,会导致资源浪费。


阅读量:1452

点赞量:0

收藏量:0