使用Kotlin时,我们通常使用@Synchronized实现线程间同步,因此很多刚接触协程的同学,视图在挂起函数上添加@Synchronized以实现”协程间同步”,这是否有效呢?
通常,协程可以帮助我们执行并行任务:
suspend fun doSomething(i: Int) { println("#$i enter critical section.") // do something critical delay(1000) println("#$i exit critical section.") } fun main() = runBlocking { repeat(2) { i -> launch(Dispatchers.Default) { println("#$i thread name: ${Thread.currentThread().name}") doSomething(i) } } }从日志可以看出,两个任务的enter和exit并行输出,并没有先后顺序
#0 thread name: DefaultDispatcher-worker-1 #1 thread name: DefaultDispatcher-worker-2 #0 enter critical section. #1 enter critical section. #1 exit critical section. #0 exit critical section.接下来添加@Synchronized试试看:
@Synchronized suspend fun doSomething(i: Int) { println("#$i enter critical section.") // do something delay(1000) println("#$i exit critical section.") } fun main() = runBlocking { repeat(2) { i -> launch(Dispatchers.Default) { println("#$i thread name: ${Thread.currentThread().name}") doSomething(i) } } } #0 thread name: DefaultDispatcher-worker-2 #0 enter critical section. #1 thread name: DefaultDispatcher-worker-1 #1 enter critical section. #0 exit critical section. #1 exit critical section.对于普通函数,由于Synchronized的添加,两个线程应该顺序执行,但是上面日志显示,对于挂起函数,无论添加Synchronized与否,仍然是并行执行的(enter,exit 同时输出 )。
我们换一种写法,在挂起函数内部添加Synchronized试试:
val LOCK = Object() suspend fun doSomething(i: Int) { synchronized(LOCK) { println("#$i enter critical section.") // do something delay(1000) // <- The 'delay' suspension point is inside a critical section println("#$i exit critical section.") } } fun main() = runBlocking { repeat(2) { i -> launch(Dispatchers.Default) { println("#$i thread name: ${Thread.currentThread().name}") doSomething(i) } } }出现如下编译错误:
"The 'delay' suspension point is inside a critical section"上线实验证明Synchronized无法用在协程同步的场景,协程同步应该使用Mutex
val mutex = Mutex() suspend fun doSomething(i: Int) { mutex.withLock { println("#$i enter critical section.") // do something delay(1000) // <- The 'delay' suspension point is inside a critical section println("#$i exit critical section.") } } fun main() = runBlocking { repeat(2) { i -> launch(Dispatchers.Default) { println("#$i thread name: ${Thread.currentThread().name}") doSomething(i) } } } #0 thread name: DefaultDispatcher-worker-1 #1 thread name: DefaultDispatcher-worker-2 #1 enter critical section. #1 exit critical section. #0 enter critical section. #0 exit critical section.为什么Synchrnoized无效呢?这需要从挂起函数的实现中寻找答案
设想一个常见的前后端通信场景:
远程获取token根据token创建post客户端显示普通的写法要借助CPS(Continuation-passing style),简单地说即回调
class Item() class Post() //1 .获取token fun requestToken(callback: (String) -> Unit) { // ... remote service callback("token") } //2. 创建post fun createPost(token: String, item: Item, callback: (Post) -> Unit) { // ... remote service callback(Post()) } //3. 显示 fun processPost(post: Post) { // do post } fun postItem(item: Item) { requestToken { token -> createPost(token, item) { post -> processPost(post) } } }如果使用挂起函数实现同样逻辑:
class Item() class Post() suspend fun requestToken(): String { // get token from api return "token" } suspend fun createPost(token: String, item: Item): Post { // create post return Post() } fun processPost(post: Post) { // do post } suspend fun postItem(item: Item) { val token = requestToken() val post = createPost(token, item) processPost(post) }挂起函数让我们摆脱了CPS带来的模板代码,但是其本质只不过是CPS的语法糖,反编译后的suspend函数依然要依靠回调完成功能:
// kotlin suspend fun createPost(token: String, item: Item): Post { ... } // Java/JVM Object createPost(String token, Item item, Continuation<Post> cont) { ... }Continuation其实是一个callback
interface Continuation<in T> { val context: CoroutineContext fun resume(value: T) fun resumeWithException(exception: Throwable) }上面例子中postItem中的一连串suspend函数调用,反编译后相当于多个callback的嵌套,只不过协程用label+递归调用的方式避免了嵌套:
suspend fun postItem(item: Item, label: Int) { switch (label) { case 0: val token = requestToken() case 1: val post = createPost(token, item) case 2: processPost(post) } }这一连串调用是有状态的,所以定义ThisSM保存当前状态,ThisSM实现Continuation了resume接口,更新自身状态后通过resume流转到下一个处理阶段,实现所谓的状态机模型
fun postItem(item: Item, cont: Continuation) { val sm = cont as? ThisSM ?: object : ThisSM { val initialCont = cont fun resume() { postIem(null, this) } } switch (sm.label) { case 0: sm.item = item sm.label = 1 requestToken(sm) case 1: val item = sm.item val token = sm.result as String sm.label = 2 createPost(token, item, sm) case 2: processPost(post) sm.initialCont.reusme() } }更多suspend函数的内容,可以参考 解密Kotlin协程的suspend修饰符
讲了这么多,为什么Synchrnoized对于suspend函数无效呢?
再看一下开头的例子
@Synchronized suspend fun doSomething(i: Int) { println("#$i enter critical section.") // do something delay(1000) println("#$i exit critical section.") }反编译后是这样的
@Synchronized fun doSomething(i: Int, cont: Continuation) { val sm = cont as? ThisSM ?: ThisSM { ... } switch (sm.label) { case 0: println("#$i enter critical section.") sm.label = 1 delay(1000, sm) case 1: println("#$i exit critical section.") } }delay调用后,doSomething函数就退出了,Synchronized也就无效了,而delay也是异步调用,后续的doSomething已经不受lock影响了。所以只有 thread name和 enter 在日志上保持了串行,enter和exit 仍然是并行输出
解密Kotlin协程的suspend修饰符