深入理解Kotlin协程-灵析社区

IT大鲨鱼

Kotlin协程

协程由程序自己创建和调度,不需要操作系统调度,所以协程比线程更加轻量。相比于线程的切换,协程切换开销更小,速度更快

我们知道线程是CPU调度的基本单位,而协程不能独立存在,必须依赖于线程。在Kotlin中,Dispatcher(内部是线程池或者Android主线程)是调度器,可以调度协程运行在一个或多个线程之中。一个线程中可以有多个协程,同一个协程可以运行在一个线程的不同时刻;多个协程可以运行在一个或多个线程的不同时刻。进程、线程、协程三者的关系:

· 进程:线程 = 1:N

· 线程:协程 = 1:N

协程优势:

· 更安全的代码:kotlin中提供了许多语言功能,避免Java中最常见的null空指针等异常

· 语法简洁、富有表现力:相比于Java,kotlin可以使用更少的代码实现更多的功能。

· 可互操作:与Java语言无缝互通。即可以在 kotlin 代码中调用Java代码,同时也可以在Java代码中调用kotlin代码。kotlin代码本质上也是通过kotlin编译器编译后生成VM能识别的字节码。

· 结构化并发:使用看似阻塞式的写法来实现异步功能。即可以同步的方式去写异步代码,相比于回调方式大幅简化了后台任务管理,例如网络请求、数据库访问等任务的管理。

Suspend 非阻塞式挂起函数

协程在常规函数的基础上添加了两项操作,用于处理长时间运行的任务。在 invoke(或call)和return之外,协程添加了 suspend 和 resume:

· suspend 用于暂停执行当前协程,并保存所有局部变量。

· resume 用于让已挂起的协程从挂起处继续执行。

suspend 函数不能在普通函数中调用,否则会报Suspend function 'xxx' should be called only from a coroutine or another suspend function 的提示;如需调用 suspend 函数,只能从其他 suspend 函数进行调用,或通过使用协程构建器(例如 launch)来启动新的协程。

suspend 函数相比于普通函数内部多了一个 Continuation 续体实例(Kotlin转成Java代码之后即可看到),suspend函数中可以调用普通函数,但普通函数却不能调用suspend函数

 suspend fun fetchDocs() {                             // Dispatchers.Main    val result = get("https://developer.android.com") // Dispatchers.IO for `get`    show(result)                                      // Dispatchers.Main}suspend fun get(url: String) = withContext(Dispatchers.IO) { /* ... */ }

suspend 修饰的函数为非阻塞式挂起函数,何为非阻塞式挂起呢?不同于Java 中的Thread.sleep() 会阻塞当前线程,suspend 修饰的函数当遇到启动子线程操作时,会在切线程时将协程挂起并记录当前挂起点,接着主线程暂停了当前协程,但可以去执行协程外逻辑而不会被阻塞;此时协程中启动的子线程也可以继续执行(相当于兵分两路,互不影响),当子线程中的逻辑执行完毕后,会自动从协程的挂起点恢复,这样就可以继续在主线程往下执行了。整体流程如下:执行suspend函数 -> 启动子线程 -> 函数挂起并记录挂起点 -> 协程暂停 -> 子线程执行完毕 -> 协程从挂起点恢复

如在上面的示例中,get() 在主线程中调用,内部在它启动网络请求之前挂起协程。虽然协程被挂起,但主线程并没有被阻塞,此时如果主线程中收到其他消息事件依然可以去处理(如点击事件等)。当网络请求完成时,get 会恢复已挂起的协程,继续执行show(result) 方法。这里注意一下,挂起函数并不一定会导致协程挂起,只有在发生异步调用时才会挂起。

Kotlin 使用 堆栈帧 管理要运行哪个函数以及所有局部变量。挂起协程时,系统会复制并保存当前的堆栈帧以供稍后使用。恢复时,会将堆栈帧从其保存位置复制回来,然后函数再次开始运行。即使代码可能看起来像普通的顺序阻塞请求,协程也能确保网络请求避免阻塞主线程。

CPS变换 + Continuation续体 + 状态机

CPS变换 (Continuation-Passing-Style Transformation) 是一种编程风格, 用来将内部要执行的逻辑封装到一个闭包里面, 然后再返回给调用者。上一节的例子中,协程就是通过传递 Continuation 来控制异步调用流程的:将函数挂起之后执行的逻辑包装起成一个Continuation, 里面包含了挂起点信息,这样当协程恢复时就可以在挂起点继续执行

Continuation意为续体,只存在于挂起函数中。Kotlin 中当一个普通函数加上suspend 关键字之后,就成为了挂起函数,如:

private suspend fun suspendFuc() {}

我们对此函数进行反编译,查看对应的Java代码为:

private final Object suspendFuc(Continuation $completion) {      return Unit.INSTANCE;}

可以看到Java代码中系统帮我们多生成了一个Continuation 类型的参数。Continuation 是一个接口类型,表示在返回T类型值的挂起点之后的延续,其中类型T代表的是原来函数的返回值类型。

@SinceKotlin("1.3")public interface Continuation<in T> {    /**     * The context of the coroutine that corresponds to this continuation.     */    public val context: CoroutineContext    /**     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the     * return value of the last suspension point.     */    public fun resumeWith(result: Result<T>)}
resumeWith 恢复执行相应协程,并传递一个结果作为最后一个挂起点的返回值。

· 状态机

上述函数中加入一个delay 函数,delay 函数本身也是一个挂起函数:

    private suspend fun suspendFuc(): String {        delay(1000)        return "value"    }

经过Tools -> Kotlin -> Show Kotlin Bytecode 反编译查看:

   private final Object suspendFuc(Continuation var1) {      Object $continuation;      label20: {         if (var1 instanceof <undefinedtype>) {            $continuation = (<undefinedtype>)var1;            if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {               ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;               break label20;            }         }                    //将要执行的Continuation逻辑传入ContinuationImpl中         $continuation = new ContinuationImpl(var1) {            // $FF: synthetic field            Object result;            int label;             //invokeSuspend()会在恢复协程挂起点时调用            @Nullable            public final Object invokeSuspend(@NotNull Object $result) {               this.result = $result;               this.label |= Integer.MIN_VALUE;               return CoroutineBaseFragment.this.suspendFuc(this);            }         };      }      Object $result = ((<undefinedtype>)$continuation).result;      //返回此状态意味着函数要被挂起      Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();            // 状态机逻辑,通过label进行分块执行      switch(((<undefinedtype>)$continuation).label) {      case 0:         ResultKt.throwOnFailure($result);         ((<undefinedtype>)$continuation).label = 1;         if (DelayKt.delay(1000L, (Continuation)$continuation) == var4) {            return var4;         }         break;      case 1:         ResultKt.throwOnFailure($result);         break;      default:         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");      }      return "value";   }

suspend挂起函数经过Kotlin编译器编译之后会进行CPS变换,并且函数里的逻辑会进行分块执行:分块逻辑数量 = 挂起函数数量 + 1,上述函数逻辑中,通过switch(label){} 状态机将逻辑分块执行,首先label = 0,此时会先将label置为1,接着调用了delay挂起函数,返回Intrinsics.COROUTINE_SUSPENDED意味着函数要被挂起;

当函数从协程挂起点恢复执行时,会调用$continuation#invokeSuspend(Object $result) 方法恢复执行,$result是上一次函数执行的结果,以参数形式带入之后,就可以从挂起点继续执行了。

此时label变为1,ResultKt.throwOnFailure($result)中通过 if (value is Result.Failure) throw value.exception判断上面的分块结果是否有异常,如果有直接抛异常;否则直接break,执行到最后的return "value" 逻辑,这样整个方法的流程就运行完毕了。

总结:suspend挂起函数的执行流程就是通过CPS变换 + Continuation + 状态机来运转的

CoroutineContext

我们知道 Android 中的 Context(子类有Application、Activity、Service)可以获取应用资源,可以启动Activity、Service等。CoroutineContext意为协程上下文,其作用可以类比 Context,通过它可以控制协程在哪个线程中执行、捕获协程异常、设置协程名称等。

public interface CoroutineContext {    //从该上下文中返回具有给定[key]的元素 或 null     public operator fun <E : Element> get(key: Key<E>): E?    //从initial开始累加上下文中的条目,并从左到右对当前累加器值和上下文中的每个元素应用operation    public fun <R> fold(initial: R, operation: (R, Element) -> R): R    //返回一个上下文,其中包含来自此上下文的元素和来自其他上下文context的元素。具有相同键的元素会被覆盖。    public operator fun plus(context: CoroutineContext): CoroutineContext =        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation            context.fold(this) { acc, element ->                val removed = acc.minusKey(element.key)                if (removed === EmptyCoroutineContext) element else {                    // make sure interceptor is always last in the context (and thus is fast to get when present)                    val interceptor = removed[ContinuationInterceptor]                    if (interceptor == null) CombinedContext(removed, element) else {                        val left = removed.minusKey(ContinuationInterceptor)                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else                            CombinedContext(CombinedContext(left, element), interceptor)                    }                }            }    //删除带有指定[key]的元素。    public fun minusKey(key: Key<*>): CoroutineContext    /**     * Key for the elements of [CoroutineContext]. [E] is a type of element with this key.     */    public interface Key<E : Element>    //CoroutineContext的一个元素。协程上下文的一个元素本身就是一个单例上下文。    public interface Element : CoroutineContext {        /**         * A key of this coroutine context element.         */        public val key: Key<*>        public override operator fun <E : Element> get(key: Key<E>): E? =            @Suppress("UNCHECKED_CAST")            if (this.key == key) this as E else null        public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =            operation(initial, this)        public override fun minusKey(key: Key<*>): CoroutineContext =            if (this.key == key) EmptyCoroutineContext else this    }}

协程总是会运行在以 CoroutineContext 类型的上下文中,其中plus操作符被重写,多个CoroutineContext相加时,会生成一个CombinedContext( 内部可以理解成一个Element都不一样的链表);同时 CombinedContext拥有 map索引能力,集合中的每个元素都有一个唯一的 Key。

继承关系

Element 定义在CoroutineContext内部,是它的内部接口。CoroutineContext 使用以下元素集定义协程的行为:

· Job:控制协程的生命周期。

· CoroutineDispatcher:将工作分派到适当的线程。

· CoroutineName:协程的名称,可用于调试。

· CoroutineExceptionHandler:处理未捕获的异常。

下面就来详细看看几种元素的使用。

CoroutineContext几种具体实现

1、Job & SupervisorJob

Job是协程的句柄。使用launchasync创建的协程都会返回一个Job实例,该实例是对应协程的唯一标识并可以管理协程的生命周期。如:

val job1 = GlobalScope.launch { }val job2 = MainScope().launch { }override fun onDestroy() {    super.onDestroy()    job1.cancel()    job2.cancel()}

注:上面的协程启动方式并不推荐在项目中直接使用,因为生命周期比较长,如果没有主动关闭可能就会产生内存泄漏。推荐在ViewModel中使用viewModelScope,LifecycleOwner中使用lifecycleScope,可以在各自的生命周期中自动关闭协程。

  val job = GlobalScope.launch(context = Job() + Dispatchers.Main,      start = CoroutineStart.LAZY) {      // 逻辑部分   }        job常用的API:  -  job.start()  // 对应 start = CoroutineStart.LAZY  -  job.cancelAndJoin()  //cancel() + join()  -  job.cancel() // 取消  -  job.isActive // 协程是否存活  -  job.isCancelled // 协程是否被取消  -  job.isCompleted //协程是否已经执行完毕

Job 有以下状态:

StateisActiveisCompletedisCancelled
New (optional initial state)falsefalsefalse
Active (default initial state)truefalsefalse
Completing (transient state)truefalsefalse
Cancelling (transient state)falsefalsetrue
Cancelled (final state)falsetruetrue
Completed (final state)falsetruefalse

状态流转:

                                          wait children    +-----+ start  +--------+ complete   +-------------+  finish  +-----------+    | New | -----> | Active | ---------> | Completing  | -------> | Completed |    +-----+        +--------+            +-------------+          +-----------+                     |  cancel / fail       |                     |     +----------------+                     |     |                     V     V                 +------------+                           finish  +-----------+                 | Cancelling | --------------------------------> | Cancelled |                 +------------+                                   +-----------+

协程通过状态机方式实现自动回调。

SupervisorJob的使用

val exceptionHandler =                CoroutineExceptionHandler { context, throwable -> log("throwable:$throwable") }GlobalScope.launch(exceptionHandler) {   //子Job1    launch(SupervisorJob()) {        delay(200)        throw NullPointerException()     }   //子Job2    launch {        delay(300)        log("child2 execute successfully")    }    //子Job3    launch {        delay(400)        log("child3 execute successfully")    }    log("parent execute successfully")}

注意子Job1传入的是 SupervisorJob,当发生异常时,兄弟协程(job2/job3)不会被取消;如果是默认配置,那么兄弟协程也会被取消,上述代码执行结果:

2022-08-30 23:31:30.142  E/TTT: parent execute successfully2022-08-30 23:31:30.346  E/TTT: throwable:java.lang.NullPointerException2022-08-30 23:31:30.446  E/TTT: child2 execute successfully2022-08-30 23:31:30.545  E/TTT: child3 execute successfully

如果将job1中的更换为默认时,执行结果为:

2022-08-30 23:31:30.142  E/TTT: parent execute successfully2022-08-30 23:31:30.346  E/TTT: throwable:java.lang.NullPointerException

可以看到job1中使用默认设置时,当job1发生了异常,兄弟协程也会被取消。注意这里job2/job3可以被取消是因为delay()中有对取消状态的判断,通知job2/job3取消可以类比Thread.interrupt(),interrupt只是通知线程要中断并设置一个中断状态,最终要不要中断还是线程自己说了算。所以如果改成以下代码,即使job1使用默认配置,job3也不会被取消:

//子Job3 launch(Dispatchers.IO) {     Thread.sleep(400)     log("child3 execute successfully") }

job3中没有了对协程状态的判断,所以即使job3被通知要取消协程了,依然会继续执行直到结束。

SuperviorJob源码浅析
public fun Job(parent: Job? = null): CompletableJob = JobImpl(parent)public fun SupervisorJob(parent: Job? = null) : CompletableJob = SupervisorJobImpl(parent)private class SupervisorJobImpl(parent: Job?) : JobImpl(parent) {    override fun childCancelled(cause: Throwable): Boolean = false}

当子协程 job1/job2/job3 启动时,会和协程本身的 Job 形成父子关系。而当子协程抛异常时,父Job 的 childCancelled() 方法会被执行,且默认返回的是 true,表示取消 父Job 及其所有 子Job;如果 子Job中使用的是 SuperviorJob,childCancelled() 返回的是 false,表示 父Job及其未发生异常的 子Job都不会受影响。

2、CoroutineDispatcher
public actual object Dispatchers {   @JvmStatic    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()        @JvmStatic    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher        @JvmStatic    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined        @JvmStatic    public val IO: CoroutineDispatcher = DefaultScheduler.IO}

将工作分派到适当的线程。

· Dispatchers.Main:运行在UI线程中。Dispatchers.Main.immediate: 如果在UI线程加载,不会做特殊处理;如果是在子线程,会通过Handler转发到主线程

· Dispatchers.IO: 执行IO密集型任务,最多提交max(64, CPU核心数)个任务执行。

· Dispatchers.DEFAULT :执行CPU密集型任务, CoroutineScheduler最多有corePoolSize个线程被创建,corePoolSize的取值为max(2, CPU核心数),即它会尽量的等于CPU核心数

· Dispatchers.Unconfined:不给协程指定运行的线程,由启动协程的线程决定;但当被挂起后, 会由恢复协程的线程继续执行。内部通过ThreadLocal保存执行协程时对应的线程,用于恢复协程时在取出对应线程并在其继续执行协程。

withContext

withContext() 可以在不引入回调的情况下控制任何代码行的线程池,因此可以将其应用于非常小的函数,例如从数据库中读取数据或执行网络请求。一种不错的做法是使用 withContext() 来确保每个函数都是主线程安全的,这意味着,您可以从主线程调用每个函数。这样,调用方就从不需要考虑应该使用哪个线程来执行函数了。

3、CoroutineName

协程的名称,可用于调试。

4、CoroutineExceptionHandler
public interface CoroutineExceptionHandler : CoroutineContext.Element {      public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>    //当有未处理的异常发生时,该方法就会执行    public fun handleException(context: CoroutineContext, exception: Throwable)}

CoroutineExceptionHandler当有未捕获的异常时就会触发执行。如果不在顶级协程中设置,那么当有异常发生时会导致程序的crash,可以通过自定义CoroutineExceptionHandler来拦截异常,如下:

val exceptionHandler =                CoroutineExceptionHandler { context, throwable -> log("throwable:$throwable") }lifecycleScope.launch(exceptionHandler) { // do something }              

自定义CoroutineExceptionHandler调用的是如下方法:

public inline fun CoroutineExceptionHandler(crossinline handler: (CoroutineContext, Throwable) -> Unit): CoroutineExceptionHandler =    object : AbstractCoroutineContextElement(CoroutineExceptionHandler), CoroutineExceptionHandler {        override fun handleException(context: CoroutineContext, exception: Throwable) =            handler.invoke(context, exception)    }

所以当发生异常时,会在handleException()中执行我们传入的handler(CoroutineContext, Throwable)方法。

CoroutineExceptionHandler使用示例

 val exceptionHandler =                CoroutineExceptionHandler { context, throwable -> log("parent throwable:$throwable") } lifecycleScope.launch(exceptionHandler) {     log("parent execute start")     val childExHandler =                    CoroutineExceptionHandler { context, throwable -> log("child throwable:$throwable") }     //子协程中如果使用SupervisorJob()、Job(),则异常不会往上传播;否则异常会在顶层协程中处理     val childJob = launch(childExHandler) {         delay(1000)         log("child execute")         throw  IllegalArgumentException("error occur")     }     childJob.join()     log("parent execute end") }

执行结果:

2022-09-04 00:38:51.792 15415-15415/ E/TTT: parent execute start2022-09-04 00:38:52.796 15415-15415/ E/TTT: child execute2022-09-04 00:38:52.805 15415-15415/ E/TTT: parent throwable:java.lang.IllegalArgumentException: error occur

可见虽然在子协程中也自定义了CoroutineExceptionHandler,但是最终子协程中的异常还是在顶层的父协程种处理的,如果就想在子协程中处理异常呢?可以在子协程中加上SupervisorJob()、Job(),则异常会在子协程中处理而不会往上传播:

 //其他内容不变 val childJob = launch(childExHandler + SupervisorJob()) {     //其他内容不变 }

执行结果:

2022-09-04 00:42:48.465 15755-15755/ E/TTT: parent execute start2022-09-04 00:42:49.468 15755-15755/ E/TTT: child execute2022-09-04 00:42:49.473 15755-15755/ E/TTT: child throwable:java.lang.IllegalArgumentException: error occur2022-09-04 00:42:49.474 15755-15755/ E/TTT: parent execute end

可见异常最终是在子协程中处理的,且虽然子协程中发生了异常,父协程依然能执行完毕。

:如果启动协程的是async,则CoroutineExceptionHandler中的handler方法并不会马上执行,必须调用deffered.await()时才会执行。

CoroutineScope 协程作用域

CoroutineScope 会跟踪它使用 launch 或 async 创建的所有协程。CoroutineScope 本身并不运行协程,但是通过CoroutineScope可以保证对协程进行统一管理,避免发生内存泄漏等,

常见的CoroutineScope:

· GlobalScope: 全局协程作用域,如果不主动通过job.cancel()关闭,其生命周期与Application一致。

· MainScope: MainScope的内部:public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main),可以看到MainScope其实是通过SupervisorJob() + Dispatchers.Main自定义的协程作用域,其内部运行在UI线程且内部异常不会往上传播。

· runBlocking: 顶层函数,和 GlobalScope 不一样,它会阻塞当前线程直到其内部所有子协程执行结束;

· ktx扩展库中的自定义GlobalScope:典型应用就是在LifecycleOwner(androidX中的FragmentActivity、Fragment都实现了该接口)使用的lifecycleScope,以及ViewModel中使用的ViewModelScope,其内部可以在合适的时机自动关闭协程,从而避免内存泄露的发生。



阅读量:1477

点赞量:0

收藏量:0