Kotlin协程基础元素梳理分析

Kotlin 协程的基础元素:Continuation、SafeContinuation、CoroutineContext、CombinedContext、CancellationException、intrinsics。CombinedContext是 CoroutineContext 的一个实现类,SafeContinuation是 Continuation 的实现类。

Continuation 是什么?

class Call<T>(callBack: Call.CallBack<T>) {
    fun cancel() {
    }
    interface CallBack<T> {
        fun onSuccess(data: T)
        fun onFail(throwable: Throwable)
    }
}
suspend fun <T : Any> Call<T>.await(): T =
    suspendCancellableCoroutine<T> { continuation ->
        val call = Call(object : Call.CallBack<T> {
            override fun onSuccess(data: T) {
                continuation.resume(data, onCancellation = null)
            }
            override fun onFail(throwable: Throwable) {
                continuation.resumeWithException(throwable)
            }
        })
        continuation.invokeOnCancellation {
            call.cancel()
        }
    }

要实现挂起函数的时候,可以使用 suspendCoroutine{}、suspendCancellableCoroutine{}这两个高阶函数,在它们的 Lambda 当中,我们可以使用它暴露出来的 continuation 对象,把程序的执行结果或异常传到外部去。常用于实现挂起函数内部逻辑。

fun checkLength() {
    runBlocking {
        val result = getStrLengthSuspend("Kotlin")
        println(result)
    }
}
suspend fun getStrLengthSuspend(text:String):Int = suspendCoroutine {
    continuation ->
    thread {
        Thread.sleep(1000L)
        continuation.resume(text.length)
    }
}
Log
6
Process finished with exit code 0

使用 suspendCoroutine{}实现了挂起函数,在它内部,使用 continuation.resume() 的方式,传出了挂起函数的返回值。

为什么以 continuation.resume() 这样异步的方式传出结果,挂起函数就能接收到结果呢?

suspend fun getStrLengthSuspend(text: String): Int = suspendCoroutine { continuation ->
    thread {
        Thread.sleep(1000L)
        continuation.resume(text.length)
    }
}
fun checkLength2() {
    val func = ::getStrLengthSuspend as (String, Continuation<Int>) -> Any?
    func("Kotlin", object :Continuation<Int> {
        override val context: CoroutineContext
            get() = EmptyCoroutineContext
 
        override fun resumeWith(result: Result<Int>) {
            println(result.getOrNull())
        }
    })
    Thread.sleep(2000L)
}
Log
6
Process finished with exit code 0

把函数强转成了带有 Continuation 的函数类型,然后通过匿名内部类的方式,创建了一个 Continuation 对象传了进去。挂起函数的本质,就是 Callback!

把 Continuation 改为 Callback:

fun getStrLength() {
    func2("Kotlin", object : MyCallBack<Int> {
        override fun resume(value: Int) {
            println(value)
        }
    })
}
fun func2(text: String, callBack: MyCallBack<Int>) {
    thread {
        Thread.sleep(1000L)
        callBack.resume(text.length)
    }
}
interface MyCallBack<T> {
    fun resume(value: T)
}
Log
6
Process finished with exit code 0

Continuation 改成 Callback 以后,使用匿名内部类创建 Callback 用于接收异步结果;异步函数内部,使用 callback.resume() 将结果传出去。

Kotlin 协程当中的 Continuation,作用其实就相当于 Callback,它既可以用于实现挂起函数,往挂起函数的外部传递结果;也可以用于调用挂起函数,我们可以创建 Continuation 的匿名内部类,来接收挂起函数传递出来的结果。

Java 代码中如何调用 Kotlin 的挂起函数吗?

public static void main(String[] args) {
        SuspendFromJavaDo.INSTANCE.getUserInfo(new Continuation<String>() {
            @NonNull
            @Override
            public CoroutineContext getContext() {
                return EmptyCoroutineContext.INSTANCE;
            }
            @Override
            public void resumeWith(@NonNull Object o) {
                System.out.println(o + "");
            }
        });
    }
object SuspendFromJavaDo {
    suspend fun getUserInfo():String {
        delay(1000L)
        return "Kotlin"
    }
}

所以,实现挂起函数逻辑的时候,常用到 suspendCoroutine{}、suspendCancellableCoroutine{}。

Continuation.kt源码

public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext
    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
    contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
    return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
        val safe = SafeContinuation(c.intercepted())
        block(safe)
        safe.getOrThrow()
    }
}

suspendCoroutineUninterceptedOrReturn{}实现了suspendCoroutine{}。

val safe = SafeContinuation(c.intercepted()) 包裹Continuation。

block(safe)调用 Lambda 当中的逻辑。

safe.getOrThrow() 取出 block(safe) 的运行结果,Continuation 当中是可以存储 result 的。这个 Result 可能是正确的结果,也可能是异常。

@Suppress("UNUSED_PARAMETER", "RedundantSuspendModifier")
public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T {
    contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
    throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic")
}

为什么这个高阶函数的源代码会是抛出异常呢?

“Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic.”suspendCoroutineUninterceptedOrReturn 是一个编译器内建函数,它是由 Kotlin 编译器来实现的。

suspendCoroutineUninterceptedOrReturn 这个高阶函数的参数,它会接收一个 Lambda,类型是(Continuation<T>) -> Any?,这里的“Any?”类型,其实就能代表当前这个挂起函数是否真正挂起。

fun test() {
    runBlocking {
        val result = testNoSuspendCoroutinue()
        println(result)
    }
}
private suspend fun testNoSuspendCoroutinue() = suspendCoroutineUninterceptedOrReturn<String>{
    continuation ->
    return@suspendCoroutineUninterceptedOrReturn "Kotlin"
}
Log
Kotlin
Process finished with exit code 0

直接使用 suspendCoroutineUninterceptedOrReturn 实现了挂起函数,并且,在它的 Lambda 当中,我们并没有调用 continuation.resume()。在挂起函数的外部确实也可以接收到这个结果。

private static final Object testNoSuspendCoroutinue(Continuation $completion) {
      int var2 = false;
      if ("Kotlin" == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
         DebugProbesKt.probeCoroutineSuspended($completion);
      }
      return "Kotlin";
   }

这个函数其实就是一个伪挂起函数,它的内部并不会真正挂起。这样,当我们从外部调用这个函数的时候,这个函数会立即返回结果。

private suspend fun testSuspendCoroutinues() =
    suspendCoroutineUninterceptedOrReturn<String> { continuation ->
        thread {
            Thread.sleep(2000L)
            continuation.resume("Kotlin")
        }
        return@suspendCoroutineUninterceptedOrReturn kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
    }
fun main() {
    runBlocking {
        val testSuspendCoroutinues = testSuspendCoroutinues()
        println(testSuspendCoroutinues)
    }
}
Kotlin
Process finished with exit code 0

使用了 continuation.resume(),挂起函数的外部也能接收到这个结果。

private static final Object testSuspendCoroutinues(Continuation $completion) {
      int var2 = false;
      ThreadsKt.thread$default(false, false, (ClassLoader)null, (String)null, 0, (Function0)(new TestCoroutine777Kt$testSuspendCoroutinues$2$1($completion)), 31, (Object)null);
      Object var10000 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        //将 var10000 赋值为 COROUTINE_SUSPENDED 这个挂起标志位。
      if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
         DebugProbesKt.probeCoroutineSuspended($completion);
      }
//返回挂起标志位,代表 testSuspendCoroutinues() 这个函数会真正挂起。
      return var10000;
   }
BuildersKt.runBlocking$default((CoroutineContext)null, (Function2)(new Function2((Continuation)null) {
         int label;
         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            Object var10000;
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               this.label = 1;
               var10000 = TestCoroutine777Kt.testSuspendCoroutinues(this);
               if (var10000 == var3) {
                  return var3;
               }
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               var10000 = $result;
               break;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
            String testSuspendCoroutinues = (String)var10000;
            System.out.println(testSuspendCoroutinues);
            return Unit.INSTANCE;
         }
         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }
         public final Object invoke(Object var1, Object var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      }), 1, (Object)null);

由于 suspend 修饰的函数,既可能返回 CoroutineSingletons.COROUTINE_SUSPENDED,也可能返回实际结果,甚至可能返回 null,为了适配所有的可能性,CPS 转换后的函数返回值类型就只能是 Any? 了。

suspendCoroutineUninterceptedOrReturn{}这个高阶函数的作用了:它可以将挂起函数当中的 Continuation 以参数的形式暴露出来,在它的 Lambda 当中,我们可以直接返回结果,这时候它就是一个“伪挂起函数”;或者,我们也可以返回 COROUTINE_SUSPENDED 这个挂起标志位,然后使用 continuation.resume() 传递结果。相应的,suspendCoroutine{}、suspendCancellableCoroutine{}这两个高阶函数,只是对它的一种封装而已。

作者:且听真言 原文地址:https://blog.csdn.net/zhangying1994/article/details/127820334

%s 个评论

要回复文章请先登录注册