【Kotlin协程】避免在suspend函数中使用@Synchronized修饰符

    科技2022-08-13  89

    使用Kotlin时,我们通常使用@Synchronized实现线程间同步,因此很多刚接触协程的同学,视图在挂起函数上添加@Synchronized以实现”协程间同步”,这是否有效呢?

    1. 协程+Synchronized ?


    通常,协程可以帮助我们执行并行任务:

    suspend fun doSomething(i: Int) { println("#$i enter critical section.") // do something critical delay(1000) println("#$i exit critical section.") } fun main() = runBlocking { repeat(2) { i -> launch(Dispatchers.Default) { println("#$i thread name: ${Thread.currentThread().name}") doSomething(i) } } }

    从日志可以看出,两个任务的enter和exit并行输出,并没有先后顺序

    #0 thread name: DefaultDispatcher-worker-1 #1 thread name: DefaultDispatcher-worker-2 #0 enter critical section. #1 enter critical section. #1 exit critical section. #0 exit critical section.

    接下来添加@Synchronized试试看:

    @Synchronized suspend fun doSomething(i: Int) { println("#$i enter critical section.") // do something delay(1000) println("#$i exit critical section.") } fun main() = runBlocking { repeat(2) { i -> launch(Dispatchers.Default) { println("#$i thread name: ${Thread.currentThread().name}") doSomething(i) } } } #0 thread name: DefaultDispatcher-worker-2 #0 enter critical section. #1 thread name: DefaultDispatcher-worker-1 #1 enter critical section. #0 exit critical section. #1 exit critical section.

    对于普通函数,由于Synchronized的添加,两个线程应该顺序执行,但是上面日志显示,对于挂起函数,无论添加Synchronized与否,仍然是并行执行的(enter,exit 同时输出 )。

    我们换一种写法,在挂起函数内部添加Synchronized试试:

    val LOCK = Object() suspend fun doSomething(i: Int) { synchronized(LOCK) { println("#$i enter critical section.") // do something delay(1000) // <- The 'delay' suspension point is inside a critical section println("#$i exit critical section.") } } fun main() = runBlocking { repeat(2) { i -> launch(Dispatchers.Default) { println("#$i thread name: ${Thread.currentThread().name}") doSomething(i) } } }

    出现如下编译错误:

    "The 'delay' suspension point is inside a critical section"

    2. 协程同步需使用Mutex


    上线实验证明Synchronized无法用在协程同步的场景,协程同步应该使用Mutex

    val mutex = Mutex() suspend fun doSomething(i: Int) { mutex.withLock { println("#$i enter critical section.") // do something delay(1000) // <- The 'delay' suspension point is inside a critical section println("#$i exit critical section.") } } fun main() = runBlocking { repeat(2) { i -> launch(Dispatchers.Default) { println("#$i thread name: ${Thread.currentThread().name}") doSomething(i) } } } #0 thread name: DefaultDispatcher-worker-1 #1 thread name: DefaultDispatcher-worker-2 #1 enter critical section. #1 exit critical section. #0 enter critical section. #0 exit critical section.

    3. 挂起函数的本质


    为什么Synchrnoized无效呢?这需要从挂起函数的实现中寻找答案

    设想一个常见的前后端通信场景:

    远程获取token根据token创建post客户端显示

    普通的写法要借助CPS(Continuation-passing style),简单地说即回调

    class Item() class Post() //1 .获取token fun requestToken(callback: (String) -> Unit) { // ... remote service callback("token") } //2. 创建post fun createPost(token: String, item: Item, callback: (Post) -> Unit) { // ... remote service callback(Post()) } //3. 显示 fun processPost(post: Post) { // do post } fun postItem(item: Item) { requestToken { token -> createPost(token, item) { post -> processPost(post) } } }

    如果使用挂起函数实现同样逻辑:

    class Item() class Post() suspend fun requestToken(): String { // get token from api return "token" } suspend fun createPost(token: String, item: Item): Post { // create post return Post() } fun processPost(post: Post) { // do post } suspend fun postItem(item: Item) { val token = requestToken() val post = createPost(token, item) processPost(post) }

    挂起函数让我们摆脱了CPS带来的模板代码,但是其本质只不过是CPS的语法糖,反编译后的suspend函数依然要依靠回调完成功能:

    // kotlin suspend fun createPost(token: String, item: Item): Post { ... } // Java/JVM Object createPost(String token, Item item, Continuation<Post> cont) { ... }

    Continuation其实是一个callback

    interface Continuation<in T> { val context: CoroutineContext fun resume(value: T) fun resumeWithException(exception: Throwable) }

    上面例子中postItem中的一连串suspend函数调用,反编译后相当于多个callback的嵌套,只不过协程用label+递归调用的方式避免了嵌套:

    suspend fun postItem(item: Item, label: Int) { switch (label) { case 0: val token = requestToken() case 1: val post = createPost(token, item) case 2: processPost(post) } }

    这一连串调用是有状态的,所以定义ThisSM保存当前状态,ThisSM实现Continuation了resume接口,更新自身状态后通过resume流转到下一个处理阶段,实现所谓的状态机模型

    fun postItem(item: Item, cont: Continuation) { val sm = cont as? ThisSM ?: object : ThisSM { val initialCont = cont fun resume() { postIem(null, this) } } switch (sm.label) { case 0: sm.item = item sm.label = 1 requestToken(sm) case 1: val item = sm.item val token = sm.result as String sm.label = 2 createPost(token, item, sm) case 2: processPost(post) sm.initialCont.reusme() } }

    更多suspend函数的内容,可以参考 解密Kotlin协程的suspend修饰符

    4. 为什么Synchrnoized无效


    讲了这么多,为什么Synchrnoized对于suspend函数无效呢?

    再看一下开头的例子

    @Synchronized suspend fun doSomething(i: Int) { println("#$i enter critical section.") // do something delay(1000) println("#$i exit critical section.") }

    反编译后是这样的

    @Synchronized fun doSomething(i: Int, cont: Continuation) { val sm = cont as? ThisSM ?: ThisSM { ... } switch (sm.label) { case 0: println("#$i enter critical section.") sm.label = 1 delay(1000, sm) case 1: println("#$i exit critical section.") } }

    delay调用后,doSomething函数就退出了,Synchronized也就无效了,而delay也是异步调用,后续的doSomething已经不受lock影响了。所以只有 thread name和 enter 在日志上保持了串行,enter和exit 仍然是并行输出

    参考

    解密Kotlin协程的suspend修饰符

    Processed: 0.015, SQL: 8