协程(21) | 结构化并发原理解析-灵析社区

德州安卓

前言

在之前文章介绍协程时,我们说过协程有个特性就是结构化并发,这是因为协程是具有父子关系的,取消父协程,会让所有子协程都取消,这可以有效防止内存泄漏。

那本篇文章就来看看结构化并发的原理。

正文

在协程框架的中层概念中,CoroutineScope就是实现结构化并发的关键,其实从字面意思也非常好理解,协程作用域,也就是规定了一个作用域,可以批量管理一个作用域内的所有协程。

为什么有CoroutineScope

其实越是到后面,越容易串起来整个协程框架的知识,让知识形成体系。

我们这里回顾一下启动协程的2个API:launchasync

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    ...
}
public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    ...
}

这里发现它们都是CoroutineScope的扩展函数,这里为什么要设计为CoroutineScope的扩展函数呢?

其实不然,在早期的协程API,这2个函数还真不是CoroutineScope的扩展函数,假如是使用早期API,伪代码如下:

// 使用协程最初的API,只是伪代码
private fun testScopeJob() {
    val job = Job()
    launch(job){
        launch {
            delay(1000000L)
            logX("Inner")
        }
        logX("Hello!")
        delay(1000000L)
        logX("World!")  // 不会执行
    }

    launch(job){
        launch {
            delay(1000000L)
            logX("Inner!!!")
        }
        logX("Hello!!!")
        delay(1000000L)
        logX("World1!!!")  // 不会执行
    }
    Thread.sleep(500L)
    job.cancel()
}

这里想实现结构化并发,我们不得不创建一个Job对象,然后传入launch中当做参数,但是开发者可能会忘记传输这个参数,所以就会打破结构化关系。

所以后面发展就专门设计出CoroutineScope来管理协程批量处理,而且把launchasync都作为该类的扩展函数,这样就不会有前面所说的忘记传递参数从而导致的非结构关系。

原理分析

从前面协程API的迭代就可以看出,其实起作用的还是Job,而CoroutineScope中包含了协程上下文,协程上下文又包含了Job,所以我们还是以launch{}启动协程为例,来分析其结构化并发的原理。

创建父子关系

这里我们写出下面示例代码:

private fun testScope() {
    //新建一个CoroutineScope
    val scope = CoroutineScope(Job())
    //由于launch是CoroutineScope的扩展函数
    scope.launch{
        //block函数类型参数的接收者是CoroutineScope
        launch {
            delay(1000000L)
            logX("Inner")  // 不会执行
        }
        logX("Hello!")
        delay(1000000L)
        logX("World!")  // 不会执行
    }

    Thread.sleep(500L)
    // 2
    scope.cancel()
}

在上面代码中,有个值得注意的地方,就是launch方法不仅仅是CoroutineScope的扩展函数,它的block类型是:suspend CoroutineScope.() -> Unit,所以在协程体中,我们依旧可以调用launch方法。

这里我们创建了一个CoroutineScope,这里我们来看一下这个方法源码:

public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
    ContextScope(if (context[Job] != null) context else context + Job())
internal class ContextScope(context: CoroutineContext) : CoroutineScope {
    override val coroutineContext: CoroutineContext = context
    override fun toString(): String = "CoroutineScope(coroutineContext=$coroutineContext)"
}

可以发现CoroutineScope()是一个顶层函数,同理函数体内部的Job()也是一个顶层函数,这里还有一个小知识点:当顶层函数当做"构造函数"来使用时,这个函数的命名可以不使用驼峰命名法,而是以大写开始。

这里返回的是CoroutineScope,在前面文章我们知道它是对CoroutineContext的封装:

public interface CoroutineScope 
    public val coroutineContext: CoroutineContext
}

CoroutineScope()方法中,通过context[Job]就可以取出保存在context中的Job对象,假如没有Job对象的话,就创建一个Job对象传入到context中,这说明一件事,每一个CoroutineScope对象,它的context当中必然存在一个Job对象。

同时也说明在调用CroutineScope()方法时,也可以不传Job对象。

接着我们继续看launch的源码:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    //注释1 继承父协程的上下文
    val newContext = newCoroutineContext(context)
    //注释2 创建协程对象
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    //注释3 开始协程
    coroutine.start(start, coroutine, block)
    return coroutine
}

这里的注释1和3,分别在前面文章分析launch启动以及线程调度都分析过了,现在轮到了注释2:

//Standalone翻译就是独立,即独立运行的协程
private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
    //继承协程抽象类,泛型为Unit,initParentJob为true
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}

//懒惰的协程
private class LazyStandaloneCoroutine(
    parentContext: CoroutineContext,
    block: suspend CoroutineScope.() -> Unit
    //继承至上面的类,initParentJob还是为true
) : StandaloneCoroutine(parentContext, active = false) {
    private val continuation = block.createCoroutineUnintercepted(this, this)

    override fun onStart() {
        continuation.startCoroutineCancellable(this)
    }
}

可以发现StandaloneCoroutineAbstractCoroutine的子类,在前面文章中我们说过这个可以看成是代表协程的抽象类,在调用其构造函数时,第二个参数initParentJob参数,一直为true,其实就是代表了协程创建以后,需要初始化协程的父子关系。

AbstractCoroutine构造函数如下:

public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {

    init {
        if (initParentJob) initParentJob(parentContext[Job])
    }
 }

该类的继承关系,在上一篇文章中我们重点分析了继承Continuation分支的,主要是用来调用intercepted()来拦截其线程调度器,本篇文章重点就是其JobSupport类。

这里initParentJob参数我们从前面可知,这里必为true,即需要初始化父子关系,其中initParentJob()函数定义在JobSupport类中:

public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0

JobSupport内容比较多,可以把它看成一个具体化的Job实现,这是因为关于Job的各种操作,都是在该类中完成的。

下面是initParentJob方法:

//parent就是父协程的Job
protected fun initParentJob(parent: Job?) {
    assert { parentHandle == null }
    //当没有父协程时,不需要创建和父协程的关系
    if (parent == null) {
        parentHandle = NonDisposableHandle
        return
    }
    //确保父协程已经启动了
    parent.start()
    //把当前Job添加到父Job中
    val handle = parent.attachChild(this)
    parentHandle = handle
    // now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
    if (isCompleted) {
        handle.dispose()
        parentHandle = NonDisposableHandle // release it just in case, to aid GC
    }
}

上面代码比较简单,看注释即可,所以我们可以把协程看成一颗N叉树,每一个协程都对应一个Job对象,而每一个Job可以有一个父Job和多个多个子Job

结构化取消

既然Job的关系如上图中的N叉树,所以结构化取消原理其实也就是事件传递了。

当某个Job收到取消事件时,需要通知其上下级。这个设计思路,就和我们公司架构一样,当某个人需要通知重要事情时,可以先告诉其下属,再告诉其领导,通过循环迭代从而可以让整个公司都知道。

我们可以想象出其取消协程的代码应该如下:

fun Job.cancelJob() {
    //通知子Job取消
    children.forEach {
        cancelJob()
    }
    //通知父Job取消
    notifyParentCancel()
}

当然这是只是简化的伪代码,真实代码复杂很多,但是原理差不多。

我们先来看一下CoroutineScopecancel函数的代码:

public fun CoroutineScope.cancel(cause: CancellationException? = null) {
    val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
    job.cancel(cause)
}

正常情况下,我们调用scope.cancel()时,一般都不会传递参数,假如要传递额外说明参数,这里必须是CancellationException类型的。

在方法实现中,我们会发现真实是调用Jobcancel()方法,该方法的实现就是在前面所说的JobSupport类中:

//外部带原因的取消,内部不能隐式调用
public override fun cancel(cause: CancellationException?) {
    cancelInternal(cause ?: defaultCancellationException())
}

这个方法是供外部来调用的,这里注意当cause为空时,这里在调用cancelInternal时会传入一个默认的CancellationException实现:

public open fun cancelInternal(cause: Throwable) {
    cancelImpl(cause)
}

该方法由方法名可以看出是内部调用,这种设计思路,我们在平时也可以使用,方法的访问权限要严格分开,该方法的参数类型是Throwable类型,会调用下面方法:

internal fun cancelImpl(cause: Any?): Boolean {
    var finalState: Any? = COMPLETING_ALREADY
    if (onCancelComplete) {
        //1
        finalState = cancelMakeCompleting(cause)
        if (finalState === COMPLETING_WAITING_CHILDREN) return true
    }
    if (finalState === COMPLETING_ALREADY) {
        //2
        finalState = makeCancelling(cause)
    }
    return when {
        finalState === COMPLETING_ALREADY -> true
        finalState === COMPLETING_WAITING_CHILDREN -> true
        finalState === TOO_LATE_TO_CANCEL -> false
        else -> {
            afterCompletion(finalState)
            true
        }
    }
}

在该方法中,cause的类型是Any?,其实从源码注释我们可以知道该参数可能是Throwable,也可能是一个ParentJob,第二种情况只会在cancelChild方法被调用时传入。

而且该方法返回true则表示异常被处理,否则表示没有被处理。

那么为什么该类中有这么多状态判断呢?原因非常简单,因为Job的状态变化是一个持续过程,只有子Job都取消完成后,该Job才能算完成了。

所以这里会调用注释1的cancelMakeComplting方法:

private fun cancelMakeCompleting(cause: Any?): Any? {
    loopOnState { state ->
        // 省略部分
        val finalState = tryMakeCompleting(state, proposedUpdate)
        if (finalState !== COMPLETING_RETRY) return finalState
    }
}

从方法名中的Completing为完成、完整的意思就可以看出,这个过程是一个持续的过程,这里有一个循环方法loopOnState,我们可以在日常项目中借鉴一下:

private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
    while (true) {
        block(state)
    }
}

这里的核心还是调用tryMakeCompleting方法:

private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
    if (state !is Incomplete)
        return COMPLETING_ALREADY
        ...
        // 省略部分
        return COMPLETING_RETRY
    }
    ...
    return tryMakeCompletingSlowPath(state, proposedUpdate)
}

通过源码注释,我们可知该方法会返回状态,而且是已完成的状态,比如这里的COMPLETING_ALREADYCOMPLETING_RETRY等,同时在该方法中分出了2个分支。

一个是快速返回分支,当该Job没有子Job,可以立即返回。当有子Job时,才会调用tryMakeCompletingSlowPath方法,这也是简化函数逻辑的一种常见手段,方法如下:

private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
    // 省略部分
    notifyRootCause?.let { notifyCancelling(list, it) }

    return finalizeFinishingState(finishing, proposedUpdate)
}

这里代码调用比较复杂,我们可以不用关注,最终会调用notifyCancelling方法,这个才是最关键的代码。

前面为什么调用一个取消要附带这么多状态判断,也是因为Job需要管理协程的状态,即只有子Job都完成时,父Job才算完成,所以这是一个持续过程。

我们看一下这个核心方法:

private fun notifyCancelling(list: NodeList, cause: Throwable) {
    onCancelling(cause)
    // 1,通知子Job
    notifyHandlers<JobCancellingNode>(list, cause)
    // 2,通知父Job
    cancelParent(cause)
}

这个方法和我们前面所说的伪代码逻辑基本一致了,我们分别来看看其中的逻辑:

//通知子Job进行取消
private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) {
    var exception: Throwable? = null
    list.forEach<T> { node ->
        try {
            //调用每个子Job的invoke方法
            node.invoke(cause)
        } catch (ex: Throwable) {
            exception?.apply { addSuppressedThrowable(ex) } ?: run {
                exception =  CompletionHandlerException("Exception in completion handler $node for $this", ex)
            }
        }
    }
    exception?.let { handleOnCompletionException(it) }
}

这里就是遍历当前Job的子Job,并且将取消的case传递过去,这里的invoke()最终会调用ChildHandleNodeinvoke()方法:

//这里是Node类型,也侧面说明了Job是树结构
internal class ChildHandleNode(
    @JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
    override val parent: Job get() = job
    //调用parentCancelled方法
    override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
    override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
}

//JobSupport中实现
public final override fun parentCancelled(parentJob: ParentJob) {
    cancelImpl(parentJob)
}

而这里代码最终会调用cancelImpl()方法,即对应了前面所说的该方法参数可能是一个Job,同时也说明这是一个递归调用,一直会调用到没有子JobJob

我们接着看一下如何通知父Job:

private fun cancelParent(cause: Throwable): Boolean {
    if (isScopedCoroutine) return true
    //是否是CancellationException异常
    val isCancellation = cause is CancellationException
    val parent = parentHandle

    if (parent === null || parent === NonDisposableHandle) {
        return isCancellation
    }
    // 1
    return parent.childCancelled(cause) || isCancellation
}

注意注释1的返回值,这个返回值是有意义的,返回true代表父协程处理了异常,而返回false,代表父协程没有处理异常。

该方法代码如下:

public open fun childCancelled(cause: Throwable): Boolean {
    //特殊处理取消异常
    if (cause is CancellationException) return true
    return cancelImpl(cause) && handlesException
}

这里我们发现当异常是CancellationException的时候,协程是会进行特殊处理的。一般来说,父协程会忽略子协程的取消异常,当是其他异常时,那么父协程就会响应子协程的取消了。这时又会调用cancelImpl(),来继续递归调用。

这里我们再结合前面文章所说的协程异常处理,我们就说过对于CancellationException异常要特殊处理,一般都是要抛出去,这里我们就可以看到原因了,原来协程的结构化取消,是需要依赖这个异常的。

这也就说明一件事,当出现CancellationException异常时,只会向下传播,来达到结构化取消的效果;但是当是其他异常时,则会双向传递,如下图:

SupervisorJob原理

在之前文章,我们说过一个特殊的Job,就是SupervisorJob,它可以防止子协程异常的蔓延,这时我们就可以知道其实现原理了:

//顶层函数当构造函数使用
public fun SupervisorJob(parent: Job? = null) : CompletableJob = SupervisorJobImpl(parent)
//这里一直返回false
private class SupervisorJobImpl(parent: Job?) : JobImpl(parent) {
    override fun childCancelled(cause: Throwable): Boolean = false
}

根据上面源码分析,这个childCancelled方法是用来对上报告的,这里直接返回false且不处理,也就是不论是什么异常都不会蔓延到其他兄弟Job

总结

本篇文章涉及的代码跳转较多,我们做个总结:

  1. 每次创建CoroutineScope的时候,它的内部会确保CoroutineContext当中一定有Job元素,而CoroutineScope就是通过这个Job对象来管理协程的。
  2. 在我们通过launchasync创建协程的时候,会同时创建AbstractCoroutine的子类,在它的initParentJob()方法中,会建立父子关系。每个协程都会对应一个Job,而每个Job都会有一个父Job,多个子Job。最终他们会形成一个N叉树的结构。
  3. 由于协程是一个N叉树的结构,因此协程的取消事件以及异常传播,也会按照这个结构进行传递。每个Job取消的时候,都会通知自己的子Job和父Job,最终以递归的形式传递给每一个子协程。
  4. 协程向上取消父Job的时候,还利用了责任链模式,确保取消事件可以一步步传递到顶层的协程。这里还有一个细节就是,默认情况下,父协程会忽略子协程的CancellationException
  5. 对于其他异常,父协程不仅会响应,还会造成其他兄弟Job出现异常,所以这里可以使用SupervisorJob来阻断异常的向上传递。

阅读量:508

点赞量:0

收藏量:0