协程是怎么切换线程的

本文我们来聊聊协程是如何实现切换线程的。要搞清楚这个问题,我觉得需要搞懂这几个知识点:

  1. Continuation,简单讲可以把它看成是Callback,回调。当协程调用suspend函数,协程会被挂起,当suspend函数执行完成后,会通过Continuation的resumeWith方法,将执行结果返回给协程让协程继续执行。
  2. ContinuationInterceptor顾名思义是Continuation拦截器,也就是Callback拦截器,它的作用就是让回调在指定的线程中执行。假设有这样一个场景,在主线程中开启协程,在子线程中执行耗时操作,当耗时操作执行完毕时,它需要执行回调方法,而回调是需要在主线程中执行的,协程框架内部在协程开启的时候就会通过拦截器将回调与主线程绑定在一起,让回调一定在主线程中执行
  3. CoroutineDispatcher是拦截器的子类,除了拥有拦截器的功能之外,它还有两个重要作用,isDispatchNeeded(context:CoroutineContext)决定回调是否需要分发到其它线程中执行, dispatch(context: CoroutineContext, block: Runnable)将回调分发到指定线程中执行
  4. ThreadContextElement处理协程中的线程的ThreadLocal相关的变量。ThreadLocal很好理解,就是线程私有变量,只能被当前线程访问。那么协程中为什么会有这ThreadLocal呢?举个例子,在同一个线程中执行两个协程,将协程命名为A、B,在执行协程时打印出协程的名称。因为是同一个线程,而且协程的名称是存储在ThreadLocal中的。所以在协程执行的时候,需要将ThreadLocal中保存协程名称的变量修改为当前协程的名称,协程执行完毕时,将变量重置。

首先通过一个例子来讲解Continuation

fun main() = runBlocking(Dispatchers.Main) { // 花括号中是Continuation
 suspendNoChangeThread()
 suspendChangeToIOThread()
 normalFunc()
}
fun normalFunc() {
 // do something
}
suspend fun suspendNoChangeThread() {
 suspendCoroutine<Unit> {
 it.resume(Unit)
 }
}
suspend fun suspendChangeToIOThread(): String {
 return withContext(Dispatchers.IO) {
 Thread.sleep(1000)
 return@withContext "OK"
 }
}

我们聚焦到main函数。我们可以把整个函数体当成Continuation

// 命名为namedContinuation避免混淆
val namedContinuation = {
 suspendNoChangeThread()
 suspendChangeToIOThread()
 normalFunc()
}

整个函数都是在runBlocking(Dispatchers.Main)中的Dispatchers.Main主线程中执行的。我们看到函数体中有两个suspend的函数,而且suspendChangeToIOThread是切换到IO线程执行的,那么当它执行完,会紧接着执行normalFunc()方法,而该方法是需要在主线程中调用的。那么说明在suspendChangeToIOThread()和normalFunc()之间有一次切换线程的过程。

我们都知道suspend修饰的函数,编译器会加上Continuation参数的。所以我们可以把suspendChangeToIOThread()等价成:

fun suspendChangeToIOThread(continuation:Continuation):String {
 return withContext(Dispatchers.IO) {
 Thread.sleep(1000)
 return@withContext "OK"
 }
}

continuation需要在主线程中执行,那么continuation参数是什么时候与主线程绑在一起的呢?我们需要跟踪runBlocking的调用链。最终会调用到Callable.kt中的startCoroutineCancellable方法

/**
 * Use this function to start coroutine in a cancellable way, so that it can be cancelled
 * while waiting to be dispatched.
 */
@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
 createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}

intercepted()方法就是把一个continuation回调和回调所需要执行的线程绑定在一起

//ContinuationImpl.kt
public fun intercepted(): Continuation<Any?> =
 intercepted
 ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
 .also { intercepted = it }

以上?代码,通过协程上下文获取到ContinuationInterceptor,调用interceptContinuation(this)方法,生成DispatchedContinuation,很显然DispatchedContinuation是一个回调的同时,还通过CoroutineDispatcher指明回调在哪个线程中执行。

// CoroutineDispatcher.kt
 public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
 DispatchedContinuation(this, continuation)
// DispatchedContinuation.kt
internal class DispatchedContinuation<in T>(
 @JvmField val dispatcher: CoroutineDispatcher,
 @JvmField val continuation: Continuation<T>
) 

接下来我们通过isDispatchNeeded和dispatch方法是如何实现回调在指定线程中执行。

isDispatchNeeded判断是否有必要切换线程。假设当前在执行的函数所在的线程与回调目标线程是同一个线程,那就没必要切线程了,否则是需要切线程的。

我们以Android中的主线程为例,讲解

internal class HandlerContext private constructor(
 private val handler: Handler,
 private val name: String?,
 private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
 override fun isDispatchNeeded(context: CoroutineContext): Boolean {
 return !invokeImmediately || Looper.myLooper() != handler.looper
 }
 override fun dispatch(context: CoroutineContext, block: Runnable) {
 if (!handler.post(block)) {
 cancelOnRejection(context, block)
 }
 }
}

HandlerContext的功能就是协程Android库将线程切换到Handler所在的线程。一个特例,切换到主线程执行。

从isDispatchNeeded方法中我们可以看到,当前线程与目标线程不同时需要切换线程(Looper.myLooper() != handler.looper)。

通过dispatch方法,我们看到只需要执行handler.post(block)方法,就能把block切到指定线程中执行。

最后我们再简单讲解下withContext是如何实现线程切换的。

fun main() = runBlocking {
 withContext(Dispatchers.IO) {
 println("withContext " + coroutineContext[Job])
 delay(1000)
 }
 println("runBlocking" + coroutineContext[Job])
}

withContext内部会创建一个新的协程,而且会阻塞外部的协程。只有内部的协程执行完成后,外部的协程才会执行。所以我们看到的打印日志是 先打印withContext 然后再打印runBlocking。

看下源码,此处需要理解协程上下文知识,具体可以看下我之前写的文章

public suspend fun <T> withContext(
 context: CoroutineContext,
 block: suspend CoroutineScope.() -> T
): T {
 return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
 // compute new context
 val oldContext = uCont.context
 val newContext = oldContext.newCoroutineContext(context)
 newContext.ensureActive()
 // FAST PATH #1 -- new context is the same as the old one
 if (newContext === oldContext) {
 // 省略代码
 }
 // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed) 
 if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
 // 省略代码
 }
 // SLOW PATH -- use new dispatcher
 val coroutine = DispatchedCoroutine(newContext, uCont)
 block.startCoroutineCancellable(coroutine, coroutine)
 coroutine.getResult()
 }
}

withContext源码将线程切换分成三种情况:

  1. 无需切换线程,而且协程上下文都没改变(newContext === oldContext)
  2. 无需切换线程,但是协程上下文其它的内容发现变化,比如说CorotineName发生变化了。会启动UndispatchedCoroutine,该协程会更新ThreadContextElement,因为线程没发生变化,只需要改变ThreadLocal中的内容即可
  3. 需要切换线程。启动DispatchedCoroutine,block.startCoroutineCancellable方法则最终会调用到intercepted(),后续流程与上文相同。

下面代码分别对应上述3个case:

fun main() = runBlocking {
 // case1 
 withContext(EmptyCoroutineContext) {
 println("withContext " + coroutineContext[Job])
 delay(1000)
 }
 // case2
 withContext(CoroutineName("newName")) {
 println("withContext " + coroutineContext[Job])
 delay(1000)
 }
 // case3
 withContext(Dispatchers.IO) {
 println("withContext " + coroutineContext[Job])
 delay(1000)
 }
 println("runBlocking" + coroutineContext[Job])
}
作者:字节小站

%s 个评论

要回复文章请先登录注册