今天我们来聊聊Kotlin
的协程Coroutine
。
如果你还没有接触过协程,推荐你先阅读这篇入门级文章What? 你还不知道Kotlin Coroutine?
如果你已经接触过协程,但对协程的原理存在疑惑,那么在阅读本篇文章之前推荐你先阅读下面的文章,这样能让你更全面更顺畅的理解这篇文章。
Kotlin协程实现原理:Suspend&CoroutineContext
Kotlin协程实现原理:CoroutineScope&Job
如果你已经接触过协程,相信你都有过以下几个疑问:
- 协程到底是个什么东西?
- 协程的
suspend
有什么作用,工作原理是怎样的?
- 协程中的一些关键名称(例如:
Job
、Coroutine
、Dispatcher
、CoroutineContext
与CoroutineScope
)它们之间到底是怎么样的关系?
- 协程的所谓非阻塞式挂起与恢复又是什么?
- 协程的内部实现原理是怎么样的?
- …
接下来的一些文章试着来分析一下这些疑问,也欢迎大家一起加入来讨论。
ContinuationInterceptor
看到Interceptor
相信第一印象应该就是拦截器,例如在Okhttp
中被广泛应用。自然在协程中ContinuationInterceptor
的作用也是用来做拦截协程的。
下面来看下它的实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public interface ContinuationInterceptor : CoroutineContext.Element { /** * The key that defines *the* context interceptor. */ companion object Key : CoroutineContext.Key<ContinuationInterceptor> /** * Returns continuation that wraps the original [continuation], thus intercepting all resumptions. * This function is invoked by coroutines framework when needed and the resulting continuations are * cached internally per each instance of the original [continuation]. * * This function may simply return original [continuation] if it does not want to intercept this particular continuation. * * When the original [continuation] completes, coroutine framework invokes [releaseInterceptedContinuation] * with the resulting continuation if it was intercepted, that is if `interceptContinuation` had previously * returned a different continuation instance. */ public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> ... }
|
只给出了关键部分,ContinuationInterceptor
继承于CoroutineContext.Element
,所以它也是CoroutineContext
,同时提供了interceptContinuation
方法,先记住这个方法后续会用到。
大家是否还记得在Kotlin协程实现原理系列的第一篇文章中,我们分析了CoroutineContext
的内部结构,当时提到了它的plus
方法,就是下面这段代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public operator fun plus(context: CoroutineContext): CoroutineContext = if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation context.fold(this) { acc, element -> val removed = acc.minusKey(element.key) if (removed === EmptyCoroutineContext) element else { // make sure interceptor is always last in the context (and thus is fast to get when present) val interceptor = removed[ContinuationInterceptor] if (interceptor == null) CombinedContext(removed, element) else { val left = removed.minusKey(ContinuationInterceptor) if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else CombinedContext(CombinedContext(left, element), interceptor) } } }
|
在这里第一次看到了ContinuationInterceptor
的身影,当时核心是为了分析CoroutineContext
,所以只是提了plus
方法每次都会将ContinuationInterceptor
添加到拼接链的尾部。
不知道有没有老铁想过这个问题,为什么要每次新加入一个CoroutineContext
都要调整ContinuationInterceptor
的位置,并将它添加到尾部?
这里其实涉及到两点。
其中一点是由于CombinedContext
的结构决定的。它有两个元素分别是left
与element
。而left
类似于前驱节点,它是一个前驱集合,而element
只是一个纯碎的CoroutineContext
,而它的get
方法每次都是从element
开始进行查找对应Key
的CoroutineContext
对象;没有匹配到才会去left
集合中进行递归查找。
所以为了加快查找ContinuationInterceptor
类型的实例,才将它加入到拼接链的尾部,对应的就是element
。
另一个原因是ContinuationInterceptor
使用的很频繁,因为每次创建协程都会去尝试查找当前协程的CoroutineContext
中是否存在ContinuationInterceptor
。例如我们通过launch
来看协程的启动。
1 2 3 4 5 6 7 8 9 10 11 12
| public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }
|
如果你使用launch
的默认参数,那么此时的Coroutine
就是StandaloneCoroutine
,然后调用start
方法启动协程。
1 2 3 4
| public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) { initParentJob() start(block, receiver, this) }
|
在start
中进入了CoroutineStart
,对应的就是下面这段代码
1 2 3 4 5 6 7
| public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) = when (this) { CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion) CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion) CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion) CoroutineStart.LAZY -> Unit // will start lazily }
|
因为我们使用的是默认参数,所以这里对应的就是CoroutineStart.DEFAULT
,最终来到block.startCoroutineCancellable
1 2 3 4
| internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) = runSafely(completion) { createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellable(Unit) }
|
在这里我们终于看到了intercepted
。
首先通过createCoroutineUnintercepted
来创建一个协程(内部具体如何创建的这篇文章先不说,后续文章会单独分析),然后再调用了intercepted
方法进行拦截操作,最后再resumeCancellable
,这个方法最终调用的就是Continuation
的resumeWith
方法,即启动协程。
所以每次启动协程都会自动回调一次resumeWith
方法。
今天的主题是ContinuationInterceptor
所以我们直接看intercepted
。
1
| public expect fun <T> Continuation<T>.intercepted(): Continuation<T>
|
发现它是一个expect
方法,它会根据不同平台实现不同的逻辑。因为我们是Android
所以直接看Android
上的actual
的实现
1 2
| public actual fun <T> Continuation<T>.intercepted(): Continuation<T> = (this as? ContinuationImpl)?.intercepted() ?: this
|
最终来到ContinuationImpl
的intercepted
方法
1 2 3 4
| public fun intercepted(): Continuation<Any?> = intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) .also { intercepted = it }
|
在这里看到了熟悉的context
,获取到ContinuationInterceptor
实例,并且调用它的interceptContinuation
方法返回一个处理过的Continuation
。
多次调用intercepted
,对应的interceptContinuation
只会调用一次。
所以ContinuationInterceptor
的拦截是通过interceptContinuation
方法进行的。既然已经明白了它的拦截方式,我们自己来手动写一个拦截器来验证一下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| val interceptor = object : ContinuationInterceptor { override val key: CoroutineContext.Key<*> = ContinuationInterceptor override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> { println("intercept todo something. change run to thread") return object : Continuation<T> by continuation { override fun resumeWith(result: Result<T>) { println("create new thread") thread { continuation.resumeWith(result) } } } } } println(Thread.currentThread().name) lifecycleScope.launch(interceptor) { println("launch start. current thread: ${Thread.currentThread().name}") withContext(Dispatchers.Main) { println("new continuation todo something in the main thread. current thread: ${Thread.currentThread().name}") } launch { println("new continuation todo something. current thread: ${Thread.currentThread().name}") } println("launch end. current thread: ${Thread.currentThread().name}") }
|
这里简单实现了一个ContinuationInterceptor
,如果拦截成功就会输出interceptContinuation
中对应的语句。下面是程序运行后的输出日志。
1 2 3 4 5 6 7 8 9 10 11 12
| main // 第一次launch intercept todo something. change run to thread create new thread launch start. current thread: Thread-2 new continuation todo something in the main thread. current thread: main create new thread // 第二次launch intercept todo something. change run to thread create new thread launch end. current thread: Thread-7 new continuation todo something. current thread: Thread-8
|
分析一下上面的日志,首先程序运行在main
线程,通过lifecycleScope.launch
启动协程并将我们自定义的intercetpor
加入到CoroutineContext
中;然后在启动的过程中发现我们自定义的interceptor
拦截成功了,同时将原本在main
线程运行的程序切换到了新的thread
线程。同时第二次launch
的时候也拦截成功。
到这里就已经可以证明我们上面对ContinuationInterceptor
理解是正确的,它可以在协程启动的时候进行拦截操作。
下面我们继续看日志,发现withContext
并没有拦截成功,这是为什么呢?注意看Dispatchers.Main
。这也是接下来需要分析的内容。
另外还有一点,如果细心的老铁就会发现,launch start
与launch end
所处的线程不一样,这是因为在withContext
结束之后,它内部还会进行一次线程恢复,将自身所处的main
线程切换到之前的线程,但为什么又与之前launch start
的线程不同呢?
大家不要忘了,协程每一个挂起后的恢复都是通过回调resumeWith
进行的,然而外部launch
协程我们进行了拦截,在它返回的Continuation
的resumeWith
回调中总是会创建新的thread
。所以发生这种情况也就不奇怪了,这是我们拦截的效果。
整体再来看这个例子,它是不是像一个简易版的协程的线程切换呢?
CoroutineDispatcher
现在我们来看Dispatchers.Main
,为什么它会导致我们拦截失败呢?要探究原因没有直接看源码更加直接有效的。
1
| public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
|
主要看它的类型,它返回的是MainCoroutineDispatcher
,然后再看它是什么
1
| public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {}
|
发现MainCoroutineDispatcher
继承于CoroutineDispatcher
,主角登场了,但还不够我们继续看CoroutineDispatcher
是什么
1 2 3 4 5 6 7 8 9 10 11 12
| public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true public abstract fun dispatch(context: CoroutineContext, block: Runnable) public open fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatch(context, block) public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation) }
|
真想已经浮出水面了,原来CoroutineDispatcher
实现了ContinuationInterceptor
,说明CoroutineDispatcher
也具有拦截器的功能。然后再结合CoroutineContext
的性质,就很好解释为什么我们自定义的拦截器没有生效。
原因就是它与我们自定义的拦截器一样都实现了ContinuationInterceptor
接口,一旦使用Dispatchers.Main
就会替换掉我们自定义的拦截器。
因果关系弄明白了现在就好办了。我们已经知道它具有拦截功能,再来看CoroutineDispatcher
提供的另外几个方法isDispatchNeeded
与dispatch
。
我们可以大胆猜测,isDispatchNeeded
就是判断是否需要分发,然后dispatch
就是如何进行分发,接下来我们来验证一下。
ContinuationInterceptor
重要的方法就是interceptContinuation
,在CoroutineDispatcher
中直接返回了DispatchedContinuation
对象,它是一个Continuation
类型。那么自然重点就是它的resumeWith
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| override fun resumeWith(result: Result<T>) { val context = continuation.context val state = result.toState() if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_ATOMIC_DEFAULT dispatcher.dispatch(context, this) } else { executeUnconfined(state, MODE_ATOMIC_DEFAULT) { withCoroutineContext(this.context, countOrElement) { continuation.resumeWith(result) } } } }
|
这里我们看到了isDispatchNeeded
与dispatch
方法,如果不需要分发自然是直接调用原始的continuation
对象的resumeWith
方法,也就没有什么类似于线程的切换。
那什么时候isDispatcheNeeded
为true
呢?这就要看它的dispatcer
是什么。
由于现在我们是拿Dispatchers.Main
作分析。所以这里我直接告诉你们它的dispatcher
是HandlerContext
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| override fun createDispatcher(allFactories: List<MainDispatcherFactory>) = HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main") internal class HandlerContext private constructor( private val handler: Handler, private val name: String?, private val invokeImmediately: Boolean ) : HandlerDispatcher(), Delay { /** * Creates [CoroutineDispatcher] for the given Android [handler]. * * @param handler a handler. * @param name an optional name for debugging. */ public constructor( handler: Handler, name: String? = null ) : this(handler, name, false) @Volatile private var _immediate: HandlerContext? = if (invokeImmediately) this else null override val immediate: HandlerContext = _immediate ?: HandlerContext(handler, name, true).also { _immediate = it } override fun isDispatchNeeded(context: CoroutineContext): Boolean { return !invokeImmediately || Looper.myLooper() != handler.looper } override fun dispatch(context: CoroutineContext, block: Runnable) { handler.post(block) } ... }
|
它继承于HandlerDispatcher
,而HandlerDispatcher
继承于MainCoroutineDispatcher
。
条件都符合,我们直接看isDispatchNeeded
方法返回true
的逻辑。
首先通过invokeImmediately
判断,它代表当前线程是否与自身的线程相同,如何你外部使用者能够保证这一点,就可以直接使用Dispatcher.Main.immediate
来避免进行线程的切换逻辑。当然为了保证外部的判断失败,最后也会通过Looper.myLooper() != handler.looper
来进行校正。对于Dispatchers.Main
这个的handle.looper
自然是主线程的looper
。
如果不能保证则invokeImmediately
为false
,直接进行线程切换。然后进入dispatch
方法,下面是Dispatchers.Main
中dispatch
的处理逻辑。
1 2 3
| override fun dispatch(context: CoroutineContext, block: Runnable) { handler.post(block) }
|
这个再熟悉不过了,因为这个时候的handler.post
就是代表向主线程推送消息,此时的block
将会在主线程进行调用。
这样线程的切换就完成。
所以综上来看,CoroutineDispatcher
为协程提供了一个线程切换的统一判断与执行标准。
首先在协程进行启动的时候通过拦截器的方式进行拦截,对应的方法是interceptContinuation
,然后返回一个具有切换线程功能的Continuation
,在每次进行resumeWith
的时候,内部再通过isDispatchNeeded
进行判断当前协程的运行是否需要切换线程。如果需要则调用dispatch
进行线程的切换,保证协程的正确运行。
如果我要自定义协程线程的切换逻辑,就可以通过继承于CoroutineDispatcher
来实现,将它的核心方法进行自定义即可。
当然,如果你是在Android
中使用协程,那基本上是不需要自定义线程的切换逻辑。因为kotlin
已经为我们提供了日常所需的Dispatchers
。主要有四种分别为:
Dispatchers.Default
: 适合在主线程之外执行占用大量CPU
资源的工作
Dispatchers.Main
: Android
主线程
Dispatchers.Unconfined
: 它不会切换线程,只是启动一个协程进行挂起,至于恢复之后所在的线程完全由调用它恢复的协程控制。
Dispatchers.IO
: 适合在主线程之外执行磁盘或网络I/O
最后我们再来简单提一下withContext
。
withContext
CoroutineDispatcher
虽然能够提供线程的切换,但这只是单方向的,因为它没有提供线程的恢复。
试想一下,我们有个网络请求,我们通过CoroutineDispatcher
将线程切换到Dispatchers.IO
,当拿到请求成功的数据之后,所在的线程还是IO
线程,这样并不能有利于我们UI
操作。所以为了解决这个问题kotlin
提供了withContext
,它不仅能够接受CoroutineDispatcher
来帮助我们切换线程,同时在执行完毕之后还会帮助我们将之前切换掉的线程进恢复,保证协程运行的连贯性。这也是为什么官方推荐使用withContext
进行协程线程的切换的原因。
而withContext
的线程恢复原理是它内部生成了一个DispatchedCoroutine
,保存切换线程时的CoroutineContext
与切换之前的Continuation
,最后在onCompletionInternal
进行恢复。
1 2 3 4 5 6 7 8
| internal override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) { if (state is CompletedExceptionally) { val exception = if (mode == MODE_IGNORE) state.cause else recoverStackTrace(state.cause, uCont) uCont.resumeUninterceptedWithExceptionMode(exception, mode) } else { uCont.resumeUninterceptedMode(state as T, mode) } }
|
这个uCont
就是切换线程之前的Continuation
。具体实现就不在这分析了,感兴趣的老铁可以自己翻一翻源码。
本篇文章主要介绍了ContinuationInterceptor
作用与如何拦截协程的,同时也分析了CoroutineDispatcher
内部结构,进一步剖析了协程线程切换的原理。希望对学习协程的伙伴们能够有所帮助,敬请期待后续的协程分析。
项目
android_startup: 提供一种在应用启动时能够更加简单、高效的方式来初始化组件,优化启动速度。不仅支持Jetpack App Startup
的全部功能,还提供额外的同步与异步等待、线程控制与多进程支持等功能。
AwesomeGithub: 基于Github
客户端,纯练习项目,支持组件化开发,支持账户密码与认证登陆。使用Kotlin
语言进行开发,项目架构是基于Jetpack&DataBinding
的MVVM
;项目中使用了Arouter
、Retrofit
、Coroutine
、Glide
、Dagger
与Hilt
等流行开源技术。
flutter_github: 基于Flutter
的跨平台版本Github
客户端,与AwesomeGithub
相对应。
android-api-analysis: 结合详细的Demo
来全面解析Android
相关的知识点, 帮助读者能够更快的掌握与理解所阐述的要点。
daily_algorithm: 每日一算法,由浅入深,欢迎加入一起共勉。