在之前文章介绍协程时,我们说过协程有个特性就是结构化并发,这是因为协程是具有父子关系的,取消父协程,会让所有子协程都取消,这可以有效防止内存泄漏。
那本篇文章就来看看结构化并发的原理。
在协程框架的中层概念中,CoroutineScope
就是实现结构化并发的关键,其实从字面意思也非常好理解,协程作用域,也就是规定了一个作用域,可以批量管理一个作用域内的所有协程。
CoroutineScope
其实越是到后面,越容易串起来整个协程框架的知识,让知识形成体系。
我们这里回顾一下启动协程的2个API:launch
和async
:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
...
}
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
...
}
这里发现它们都是CoroutineScope
的扩展函数,这里为什么要设计为CoroutineScope
的扩展函数呢?
其实不然,在早期的协程API,这2个函数还真不是CoroutineScope
的扩展函数,假如是使用早期API,伪代码如下:
// 使用协程最初的API,只是伪代码
private fun testScopeJob() {
val job = Job()
launch(job){
launch {
delay(1000000L)
logX("Inner")
}
logX("Hello!")
delay(1000000L)
logX("World!") // 不会执行
}
launch(job){
launch {
delay(1000000L)
logX("Inner!!!")
}
logX("Hello!!!")
delay(1000000L)
logX("World1!!!") // 不会执行
}
Thread.sleep(500L)
job.cancel()
}
这里想实现结构化并发,我们不得不创建一个Job
对象,然后传入launch
中当做参数,但是开发者可能会忘记传输这个参数,所以就会打破结构化关系。
所以后面发展就专门设计出CoroutineScope
来管理协程批量处理,而且把launch
和async
都作为该类的扩展函数,这样就不会有前面所说的忘记传递参数从而导致的非结构关系。
从前面协程API的迭代就可以看出,其实起作用的还是Job
,而CoroutineScope
中包含了协程上下文,协程上下文又包含了Job
,所以我们还是以launch{}
启动协程为例,来分析其结构化并发的原理。
这里我们写出下面示例代码:
private fun testScope() {
//新建一个CoroutineScope
val scope = CoroutineScope(Job())
//由于launch是CoroutineScope的扩展函数
scope.launch{
//block函数类型参数的接收者是CoroutineScope
launch {
delay(1000000L)
logX("Inner") // 不会执行
}
logX("Hello!")
delay(1000000L)
logX("World!") // 不会执行
}
Thread.sleep(500L)
// 2
scope.cancel()
}
在上面代码中,有个值得注意的地方,就是launch
方法不仅仅是CoroutineScope
的扩展函数,它的block
类型是:suspend CoroutineScope.() -> Unit
,所以在协程体中,我们依旧可以调用launch
方法。
这里我们创建了一个CoroutineScope
,这里我们来看一下这个方法源码:
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
ContextScope(if (context[Job] != null) context else context + Job())
internal class ContextScope(context: CoroutineContext) : CoroutineScope {
override val coroutineContext: CoroutineContext = context
override fun toString(): String = "CoroutineScope(coroutineContext=$coroutineContext)"
}
可以发现CoroutineScope()
是一个顶层函数,同理函数体内部的Job()
也是一个顶层函数,这里还有一个小知识点:当顶层函数当做"构造函数"来使用时,这个函数的命名可以不使用驼峰命名法,而是以大写开始。
这里返回的是CoroutineScope
,在前面文章我们知道它是对CoroutineContext
的封装:
public interface CoroutineScope
public val coroutineContext: CoroutineContext
}
在CoroutineScope()
方法中,通过context[Job]
就可以取出保存在context
中的Job
对象,假如没有Job
对象的话,就创建一个Job
对象传入到context
中,这说明一件事,每一个CoroutineScope
对象,它的context
当中必然存在一个Job
对象。
同时也说明在调用CroutineScope()
方法时,也可以不传Job
对象。
接着我们继续看launch
的源码:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
//注释1 继承父协程的上下文
val newContext = newCoroutineContext(context)
//注释2 创建协程对象
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
//注释3 开始协程
coroutine.start(start, coroutine, block)
return coroutine
}
这里的注释1和3,分别在前面文章分析launch
启动以及线程调度都分析过了,现在轮到了注释2:
//Standalone翻译就是独立,即独立运行的协程
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
//继承协程抽象类,泛型为Unit,initParentJob为true
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}
//懒惰的协程
private class LazyStandaloneCoroutine(
parentContext: CoroutineContext,
block: suspend CoroutineScope.() -> Unit
//继承至上面的类,initParentJob还是为true
) : StandaloneCoroutine(parentContext, active = false) {
private val continuation = block.createCoroutineUnintercepted(this, this)
override fun onStart() {
continuation.startCoroutineCancellable(this)
}
}
可以发现StandaloneCoroutine
是AbstractCoroutine
的子类,在前面文章中我们说过这个可以看成是代表协程的抽象类,在调用其构造函数时,第二个参数initParentJob
参数,一直为true
,其实就是代表了协程创建以后,需要初始化协程的父子关系。
AbstractCoroutine
构造函数如下:
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
init {
if (initParentJob) initParentJob(parentContext[Job])
}
}
该类的继承关系,在上一篇文章中我们重点分析了继承Continuation
分支的,主要是用来调用intercepted()
来拦截其线程调度器,本篇文章重点就是其JobSupport
类。
这里initParentJob
参数我们从前面可知,这里必为true
,即需要初始化父子关系,其中initParentJob()
函数定义在JobSupport
类中:
public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0
JobSupport
内容比较多,可以把它看成一个具体化的Job
实现,这是因为关于Job
的各种操作,都是在该类中完成的。
下面是initParentJob
方法:
//parent就是父协程的Job
protected fun initParentJob(parent: Job?) {
assert { parentHandle == null }
//当没有父协程时,不需要创建和父协程的关系
if (parent == null) {
parentHandle = NonDisposableHandle
return
}
//确保父协程已经启动了
parent.start()
//把当前Job添加到父Job中
val handle = parent.attachChild(this)
parentHandle = handle
// now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
if (isCompleted) {
handle.dispose()
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
}
上面代码比较简单,看注释即可,所以我们可以把协程看成一颗N叉树,每一个协程都对应一个Job
对象,而每一个Job
可以有一个父Jo
b和多个多个子Job
。
既然Job
的关系如上图中的N叉树,所以结构化取消原理其实也就是事件传递了。
当某个Job
收到取消事件时,需要通知其上下级。这个设计思路,就和我们公司架构一样,当某个人需要通知重要事情时,可以先告诉其下属,再告诉其领导,通过循环迭代从而可以让整个公司都知道。
我们可以想象出其取消协程的代码应该如下:
fun Job.cancelJob() {
//通知子Job取消
children.forEach {
cancelJob()
}
//通知父Job取消
notifyParentCancel()
}
当然这是只是简化的伪代码,真实代码复杂很多,但是原理差不多。
我们先来看一下CoroutineScope
的cancel
函数的代码:
public fun CoroutineScope.cancel(cause: CancellationException? = null) {
val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
job.cancel(cause)
}
正常情况下,我们调用scope.cancel()
时,一般都不会传递参数,假如要传递额外说明参数,这里必须是CancellationException
类型的。
在方法实现中,我们会发现真实是调用Job
的cancel()
方法,该方法的实现就是在前面所说的JobSupport
类中:
//外部带原因的取消,内部不能隐式调用
public override fun cancel(cause: CancellationException?) {
cancelInternal(cause ?: defaultCancellationException())
}
这个方法是供外部来调用的,这里注意当cause
为空时,这里在调用cancelInternal
时会传入一个默认的CancellationException
实现:
public open fun cancelInternal(cause: Throwable) {
cancelImpl(cause)
}
该方法由方法名可以看出是内部调用,这种设计思路,我们在平时也可以使用,方法的访问权限要严格分开,该方法的参数类型是Throwable
类型,会调用下面方法:
internal fun cancelImpl(cause: Any?): Boolean {
var finalState: Any? = COMPLETING_ALREADY
if (onCancelComplete) {
//1
finalState = cancelMakeCompleting(cause)
if (finalState === COMPLETING_WAITING_CHILDREN) return true
}
if (finalState === COMPLETING_ALREADY) {
//2
finalState = makeCancelling(cause)
}
return when {
finalState === COMPLETING_ALREADY -> true
finalState === COMPLETING_WAITING_CHILDREN -> true
finalState === TOO_LATE_TO_CANCEL -> false
else -> {
afterCompletion(finalState)
true
}
}
}
在该方法中,cause
的类型是Any?
,其实从源码注释我们可以知道该参数可能是Throwable
,也可能是一个ParentJob
,第二种情况只会在cancelChild
方法被调用时传入。
而且该方法返回true
则表示异常被处理,否则表示没有被处理。
那么为什么该类中有这么多状态判断呢?原因非常简单,因为Job
的状态变化是一个持续过程,只有子Job
都取消完成后,该Job
才能算完成了。
所以这里会调用注释1的cancelMakeComplting
方法:
private fun cancelMakeCompleting(cause: Any?): Any? {
loopOnState { state ->
// 省略部分
val finalState = tryMakeCompleting(state, proposedUpdate)
if (finalState !== COMPLETING_RETRY) return finalState
}
}
从方法名中的Completing
为完成、完整的意思就可以看出,这个过程是一个持续的过程,这里有一个循环方法loopOnState
,我们可以在日常项目中借鉴一下:
private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
while (true) {
block(state)
}
}
这里的核心还是调用tryMakeCompleting
方法:
private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
if (state !is Incomplete)
return COMPLETING_ALREADY
...
// 省略部分
return COMPLETING_RETRY
}
...
return tryMakeCompletingSlowPath(state, proposedUpdate)
}
通过源码注释,我们可知该方法会返回状态,而且是已完成的状态,比如这里的COMPLETING_ALREADY
、COMPLETING_RETRY
等,同时在该方法中分出了2个分支。
一个是快速返回分支,当该Job
没有子Job
,可以立即返回。当有子Job
时,才会调用tryMakeCompletingSlowPath
方法,这也是简化函数逻辑的一种常见手段,方法如下:
private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
// 省略部分
notifyRootCause?.let { notifyCancelling(list, it) }
return finalizeFinishingState(finishing, proposedUpdate)
}
这里代码调用比较复杂,我们可以不用关注,最终会调用notifyCancelling
方法,这个才是最关键的代码。
前面为什么调用一个取消要附带这么多状态判断,也是因为Job
需要管理协程的状态,即只有子Job
都完成时,父Job
才算完成,所以这是一个持续过程。
我们看一下这个核心方法:
private fun notifyCancelling(list: NodeList, cause: Throwable) {
onCancelling(cause)
// 1,通知子Job
notifyHandlers<JobCancellingNode>(list, cause)
// 2,通知父Job
cancelParent(cause)
}
这个方法和我们前面所说的伪代码逻辑基本一致了,我们分别来看看其中的逻辑:
//通知子Job进行取消
private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) {
var exception: Throwable? = null
list.forEach<T> { node ->
try {
//调用每个子Job的invoke方法
node.invoke(cause)
} catch (ex: Throwable) {
exception?.apply { addSuppressedThrowable(ex) } ?: run {
exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)
}
}
}
exception?.let { handleOnCompletionException(it) }
}
这里就是遍历当前Job
的子Job
,并且将取消的case
传递过去,这里的invoke()
最终会调用ChildHandleNode
的invoke()
方法:
//这里是Node类型,也侧面说明了Job是树结构
internal class ChildHandleNode(
@JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
override val parent: Job get() = job
//调用parentCancelled方法
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
}
//JobSupport中实现
public final override fun parentCancelled(parentJob: ParentJob) {
cancelImpl(parentJob)
}
而这里代码最终会调用cancelImpl()
方法,即对应了前面所说的该方法参数可能是一个Job
,同时也说明这是一个递归调用,一直会调用到没有子Job
的Job
。
我们接着看一下如何通知父Job
:
private fun cancelParent(cause: Throwable): Boolean {
if (isScopedCoroutine) return true
//是否是CancellationException异常
val isCancellation = cause is CancellationException
val parent = parentHandle
if (parent === null || parent === NonDisposableHandle) {
return isCancellation
}
// 1
return parent.childCancelled(cause) || isCancellation
}
注意注释1的返回值,这个返回值是有意义的,返回true
代表父协程处理了异常,而返回false
,代表父协程没有处理异常。
该方法代码如下:
public open fun childCancelled(cause: Throwable): Boolean {
//特殊处理取消异常
if (cause is CancellationException) return true
return cancelImpl(cause) && handlesException
}
这里我们发现当异常是CancellationException
的时候,协程是会进行特殊处理的。一般来说,父协程会忽略子协程的取消异常,当是其他异常时,那么父协程就会响应子协程的取消了。这时又会调用cancelImpl()
,来继续递归调用。
这里我们再结合前面文章所说的协程异常处理,我们就说过对于CancellationException
异常要特殊处理,一般都是要抛出去,这里我们就可以看到原因了,原来协程的结构化取消,是需要依赖这个异常的。
这也就说明一件事,当出现CancellationException
异常时,只会向下传播,来达到结构化取消的效果;但是当是其他异常时,则会双向传递,如下图:
SupervisorJob
原理在之前文章,我们说过一个特殊的Job
,就是SupervisorJob
,它可以防止子协程异常的蔓延,这时我们就可以知道其实现原理了:
//顶层函数当构造函数使用
public fun SupervisorJob(parent: Job? = null) : CompletableJob = SupervisorJobImpl(parent)
//这里一直返回false
private class SupervisorJobImpl(parent: Job?) : JobImpl(parent) {
override fun childCancelled(cause: Throwable): Boolean = false
}
根据上面源码分析,这个childCancelled
方法是用来对上报告的,这里直接返回false
且不处理,也就是不论是什么异常都不会蔓延到其他兄弟Job
。
本篇文章涉及的代码跳转较多,我们做个总结:
CoroutineScope
的时候,它的内部会确保CoroutineContext
当中一定有Job
元素,而CoroutineScope
就是通过这个Job
对象来管理协程的。launch
、async
创建协程的时候,会同时创建AbstractCoroutine
的子类,在它的initParentJob()
方法中,会建立父子关系。每个协程都会对应一个Job
,而每个Job
都会有一个父Job
,多个子Job
。最终他们会形成一个N叉树的结构。Job
取消的时候,都会通知自己的子Job
和父Job
,最终以递归的形式传递给每一个子协程。Job
的时候,还利用了责任链模式,确保取消事件可以一步步传递到顶层的协程。这里还有一个细节就是,默认情况下,父协程会忽略子协程的CancellationException
。Job
出现异常,所以这里可以使用SupervisorJob
来阻断异常的向上传递。阅读量:508
点赞量:0
收藏量:0