盒子
盒子
文章目录
  1. ContinuationInterceptor
  2. CoroutineDispatcher
  3. withContext
  4. 项目

Kotlin协程实现原理:ContinuationInterceptor&CoroutineDispatcher

今天我们来聊聊Kotlin的协程Coroutine。

如果你还没有接触过协程,推荐你先阅读这篇入门级文章What? 你还不知道Kotlin Coroutine?

如果你已经接触过协程,但对协程的原理存在疑惑,那么在阅读本篇文章之前推荐你先阅读下面的文章,这样能让你更全面更顺畅的理解这篇文章。

Kotlin协程实现原理:Suspend&CoroutineContext

Kotlin协程实现原理:CoroutineScope&Job

如果你已经接触过协程,相信你都有过以下几个疑问:

  1. 协程到底是个什么东西?
  2. 协程的suspend有什么作用,工作原理是怎样的?
  3. 协程中的一些关键名称(例如:Job、Coroutine、Dispatcher、4. CoroutineContext与CoroutineScope)它们之间到底是怎么样的关系?
  4. 协程的所谓非阻塞式挂起与恢复又是什么?
  5. 协程的内部实现原理是怎么样的?

接下来的一些文章试着来分析一下这些疑问,也欢迎大家一起加入来讨论。

ContinuationInterceptor

看到Interceptor相信第一印象应该就是拦截器,例如在Okhttp中被广泛应用。自然在协程中ContinuationInterceptor的作用也是用来做拦截协程的。

下面来看下它的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface ContinuationInterceptor : CoroutineContext.Element {
/**
* The key that defines *the* context interceptor.
*/
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
/**
* Returns continuation that wraps the original [continuation], thus intercepting all resumptions.
* This function is invoked by coroutines framework when needed and the resulting continuations are
* cached internally per each instance of the original [continuation].
*
* This function may simply return original [continuation] if it does not want to intercept this particular continuation.
*
* When the original [continuation] completes, coroutine framework invokes [releaseInterceptedContinuation]
* with the resulting continuation if it was intercepted, that is if `interceptContinuation` had previously
* returned a different continuation instance.
*/
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
...
}

只给出了关键部分,ContinuationInterceptor继承于CoroutineContext.Element,所以它也是CoroutineContext,同时提供了interceptContinuation方法,先记住这个方法后续会用到。

大家是否还记得在Kotlin协程实现原理系列的第一篇文章中,我们分析了CoroutineContext的内部结构,当时提到了它的plus方法,就是下面这段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}

在这里第一次看到了ContinuationInterceptor的身影,当时核心是为了分析CoroutineContext,所以只是提了plus方法每次都会将ContinuationInterceptor添加到拼接链的尾部。

不知道有没有老铁想过这个问题,为什么要每次新加入一个CoroutineContext都要调整ContinuationInterceptor的位置,并将它添加到尾部?

这里其实涉及到两点。

其中一点是由于CombinedContext的结构决定的。它有两个元素分别是left与element。而left类似于前驱节点,它是一个前驱集合,而element只是一个纯碎的CoroutineContext,而它的get方法每次都是从element开始进行查找对应Key的CoroutineContext对象;没有匹配到才会去left集合中进行递归查找。

所以为了加快查找ContinuationInterceptor类型的实例,才将它加入到拼接链的尾部,对应的就是element。

另一个原因是ContinuationInterceptor使用的很频繁,因为每次创建协程都会去尝试查找当前协程的CoroutineContext中是否存在ContinuationInterceptor。例如我们通过launch来看协程的启动。

1
2
3
4
5
6
7
8
9
10
11
12
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

如果你使用launch的默认参数,那么此时的Coroutine就是StandaloneCoroutine,然后调用start方法启动协程。

1
2
3
4
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}

在start中进入了CoroutineStart,对应的就是下面这段代码

1
2
3
4
5
6
7
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
when (this) {
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
CoroutineStart.LAZY -> Unit // will start lazily
}

因为我们使用的是默认参数,所以这里对应的就是CoroutineStart.DEFAULT,最终来到block.startCoroutineCancellable

1
2
3
4
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellable(Unit)
}

在这里我们终于看到了intercepted。

首先通过createCoroutineUnintercepted来创建一个协程(内部具体如何创建的这篇文章先不说,后续文章会单独分析),然后再调用了intercepted方法进行拦截操作,最后再resumeCancellable,这个方法最终调用的就是Continuation的resumeWith方法,即启动协程。

所以每次启动协程都会自动回调一次resumeWith方法。

今天的主题是ContinuationInterceptor所以我们直接看intercepted。

1
public expect fun <T> Continuation<T>.intercepted(): Continuation<T>

发现它是一个expect方法,它会根据不同平台实现不同的逻辑。因为我们是Android所以直接看Android上的actual的实现

1
2
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this

最终来到ContinuationImpl的intercepted方法

1
2
3
4
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }

在这里看到了熟悉的context,获取到ContinuationInterceptor实例,并且调用它的interceptContinuation方法返回一个处理过的Continuation。

多次调用intercepted,对应的interceptContinuation只会调用一次。

所以ContinuationInterceptor的拦截是通过interceptContinuation方法进行的。既然已经明白了它的拦截方式,我们自己来手动写一个拦截器来验证一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
val interceptor = object : ContinuationInterceptor {

override val key: CoroutineContext.Key<*> = ContinuationInterceptor

override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
println("intercept todo something. change run to thread")
return object : Continuation<T> by continuation {
override fun resumeWith(result: Result<T>) {
println("create new thread")
thread {
continuation.resumeWith(result)
}
}
}
}
}

println(Thread.currentThread().name)

lifecycleScope.launch(interceptor) {
println("launch start. current thread: ${Thread.currentThread().name}")

withContext(Dispatchers.Main) {
println("new continuation todo something in the main thread. current thread: ${Thread.currentThread().name}")
}

launch {
println("new continuation todo something. current thread: ${Thread.currentThread().name}")
}

println("launch end. current thread: ${Thread.currentThread().name}")
}

这里简单实现了一个ContinuationInterceptor,如果拦截成功就会输出interceptContinuation中对应的语句。下面是程序运行后的输出日志。

1
2
3
4
5
6
7
8
9
10
11
12
main
// 第一次launch
intercept todo something. change run to thread
create new thread
launch start. current thread: Thread-2
new continuation todo something in the main thread. current thread: main
create new thread
// 第二次launch
intercept todo something. change run to thread
create new thread
launch end. current thread: Thread-7
new continuation todo something. current thread: Thread-8

分析一下上面的日志,首先程序运行在main线程,通过lifecycleScope.launch启动协程并将我们自定义的intercetpor加入到CoroutineContext中;然后在启动的过程中发现我们自定义的interceptor拦截成功了,同时将原本在main线程运行的程序切换到了新的thread线程。同时第二次launch的时候也拦截成功。

到这里就已经可以证明我们上面对ContinuationInterceptor理解是正确的,它可以在协程启动的时候进行拦截操作。

下面我们继续看日志,发现withContext并没有拦截成功,这是为什么呢?注意看Dispatchers.Main。这也是接下来需要分析的内容。

另外还有一点,如果细心的老铁就会发现,launch start与launch end所处的线程不一样,这是因为在withContext结束之后,它内部还会进行一次线程恢复,将自身所处的main线程切换到之前的线程,但为什么又与之前launch start的线程不同呢?

大家不要忘了,协程每一个挂起后的恢复都是通过回调resumeWith进行的,然而外部launch协程我们进行了拦截,在它返回的Continuation的resumeWith回调中总是会创建新的thread。所以发生这种情况也就不奇怪了,这是我们拦截的效果。

整体再来看这个例子,它是不是像一个简易版的协程的线程切换呢?

CoroutineDispatcher

现在我们来看Dispatchers.Main,为什么它会导致我们拦截失败呢?要探究原因没有直接看源码更加直接有效的。

1
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

主要看它的类型,它返回的是MainCoroutineDispatcher,然后再看它是什么

1
public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {}

发现MainCoroutineDispatcher继承于CoroutineDispatcher,主角登场了,但还不够我们继续看CoroutineDispatcher是什么

1
2
3
4
5
6
7
8
9
10
11
12
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {

public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true

public abstract fun dispatch(context: CoroutineContext, block: Runnable)

public open fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatch(context, block)

public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
}

真想已经浮出水面了,原来CoroutineDispatcher实现了ContinuationInterceptor,说明CoroutineDispatcher也具有拦截器的功能。然后再结合CoroutineContext的性质,就很好解释为什么我们自定义的拦截器没有生效。

原因就是它与我们自定义的拦截器一样都实现了ContinuationInterceptor接口,一旦使用Dispatchers.Main就会替换掉我们自定义的拦截器。

因果关系弄明白了现在就好办了。我们已经知道它具有拦截功能,再来看CoroutineDispatcher提供的另外几个方法isDispatchNeeded与dispatch。

我们可以大胆猜测,isDispatchNeeded就是判断是否需要分发,然后dispatch就是如何进行分发,接下来我们来验证一下。

ContinuationInterceptor重要的方法就是interceptContinuation,在CoroutineDispatcher中直接返回了DispatchedContinuation对象,它是一个Continuation类型。那么自然重点就是它的resumeWith方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC_DEFAULT
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
}

这里我们看到了isDispatchNeeded与dispatch方法,如果不需要分发自然是直接调用原始的continuation对象的resumeWith方法,也就没有什么类似于线程的切换。

那什么时候isDispatcheNeeded为true呢?这就要看它的dispatcer是什么。

由于现在我们是拿Dispatchers.Main作分析。所以这里我直接告诉你们它的dispatcher是HandlerContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
/**
* Creates [CoroutineDispatcher] for the given Android [handler].
*
* @param handler a handler.
* @param name an optional name for debugging.
*/
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
@Volatile
private var _immediate: HandlerContext? = if (invokeImmediately) this else null
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also { _immediate = it }
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
...
}

它继承于HandlerDispatcher,而HandlerDispatcher继承于MainCoroutineDispatcher。

条件都符合,我们直接看isDispatchNeeded方法返回true的逻辑。

首先通过invokeImmediately判断,它代表当前线程是否与自身的线程相同,如何你外部使用者能够保证这一点,就可以直接使用Dispatcher.Main.immediate来避免进行线程的切换逻辑。当然为了保证外部的判断失败,最后也会通过Looper.myLooper() != handler.looper来进行校正。对于Dispatchers.Main这个的handle.looper自然是主线程的looper。

如果不能保证则invokeImmediately为false,直接进行线程切换。然后进入dispatch方法,下面是Dispatchers.Main中dispatch的处理逻辑。

1
2
3
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}

这个再熟悉不过了,因为这个时候的handler.post就是代表向主线程推送消息,此时的block将会在主线程进行调用。

这样线程的切换就完成。

所以综上来看,CoroutineDispatcher为协程提供了一个线程切换的统一判断与执行标准。

首先在协程进行启动的时候通过拦截器的方式进行拦截,对应的方法是interceptContinuation,然后返回一个具有切换线程功能的Continuation,在每次进行resumeWith的时候,内部再通过isDispatchNeeded进行判断当前协程的运行是否需要切换线程。如果需要则调用dispatch进行线程的切换,保证协程的正确运行。

如果我要自定义协程线程的切换逻辑,就可以通过继承于CoroutineDispatcher来实现,将它的核心方法进行自定义即可。

当然,如果你是在Android中使用协程,那基本上是不需要自定义线程的切换逻辑。因为kotlin已经为我们提供了日常所需的Dispatchers。主要有四种分别为:

  1. Dispatchers.Default: 适合在主线程之外执行占用大量CPU资源的工作
  2. Dispatchers.Main: Android主线程
  3. Dispatchers.Unconfined: 它不会切换线程,只是启动一个协程进行挂起,至于恢复之后所在的线程完全由调用它恢复的协程控制。
  4. Dispatchers.IO: 适合在主线程之外执行磁盘或网络I/O

最后我们再来简单提一下withContext。

withContext

CoroutineDispatcher虽然能够提供线程的切换,但这只是单方向的,因为它没有提供线程的恢复。

试想一下,我们有个网络请求,我们通过CoroutineDispatcher将线程切换到Dispatchers.IO,当拿到请求成功的数据之后,所在的线程还是IO线程,这样并不能有利于我们UI操作。所以为了解决这个问题kotlin提供了withContext,它不仅能够接受CoroutineDispatcher来帮助我们切换线程,同时在执行完毕之后还会帮助我们将之前切换掉的线程进恢复,保证协程运行的连贯性。这也是为什么官方推荐使用withContext进行协程线程的切换的原因。

而withContext的线程恢复原理是它内部生成了一个DispatchedCoroutine,保存切换线程时的CoroutineContext与切换之前的Continuation,最后在onCompletionInternal进行恢复。

1
2
3
4
5
6
7
8
internal override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
if (state is CompletedExceptionally) {
val exception = if (mode == MODE_IGNORE) state.cause else recoverStackTrace(state.cause, uCont)
uCont.resumeUninterceptedWithExceptionMode(exception, mode)
} else {
uCont.resumeUninterceptedMode(state as T, mode)
}
}

这个uCont就是切换线程之前的Continuation。具体实现就不在这分析了,感兴趣的老铁可以自己翻一翻源码。

本篇文章主要介绍了ContinuationInterceptor作用与如何拦截协程的,同时也分析了CoroutineDispatcher内部结构,进一步剖析了协程线程切换的原理。希望对学习协程的伙伴们能够有所帮助,敬请期待后续的协程分析。

项目

android_startup: 提供一种在应用启动时能够更加简单、高效的方式来初始化组件,优化启动速度。不仅支持Jetpack App Startup的全部功能,还提供额外的同步与异步等待、线程控制与多进程支持等功能。

AwesomeGithub: 基于Github的客户端,纯练习项目,支持组件化开发,支持账户密码与认证登陆。使用Kotlin语言进行开发,项目架构是基于JetPack&DataBinding的MVVM;项目中使用了Arouter、Retrofit、Coroutine、Glide、Dagger与Hilt等流行开源技术。

flutter_github: 基于Flutter的跨平台版本Github客户端,与AwesomeGithub相对应。

android-api-analysis: 结合详细的Demo来全面解析Android相关的知识点, 帮助读者能够更快的掌握与理解所阐述的要点。

daily_algorithm: 每日一算法,由浅入深,欢迎加入一起共勉。

支持一下
赞赏是一门艺术