在之前分享过一篇文章 Google 推荐在 MVVM 架构中使用 Kotlin Flow ,在这篇文章中分析了如何在 MVVM 架构中使用 Kotlin Flow,以及 Kotlin Flow 为我们解决了以下问题:
LiveData 是一个生命周期感知组件,最好在 View 和 ViewModel 层中使用它,如果在 Repositories 或者 DataSource 中使用会有几个问题
它不支持线程切换,其次不支持背压,也就是在一段时间内发送数据的速度 > 接受数据的速度,LiveData 无法正确的处理这些请求使用 LiveData 的最大问题是所有数据转换都将在主线程上完成RxJava 虽然支持线程切换和背压,但是 RxJava 那么多傻傻分不清楚的操作符,实际上在项目中常用的可能只有几个例如 Observable 、 Flowable 、 Single 等等,如果我们不去了解背后的原理,造成内存泄露是很正常的事,大家可以从 StackOverflow 上查看一下,有很多因为 RxJava 造成内存泄露的例子
RxJava 入门的门槛很高,学习过的朋友们,我相信能够体会到从入门到放弃是什么感觉
解决回调地狱的问题
而相对于以上的不足,Flow 有以下优点:
Flow 支持线程切换、背压Flow 入门的门槛很低,没有那么多傻傻分不清楚的操作符简单的数据转换与操作符,如 map 等等Flow 是对 Kotlin 协程的扩展,让我们可以像运行同步代码一样运行异步代码,使得代码更加简洁,提高了代码的可读性易于做单元测试而这篇文章主要来分析一下 PokemonGo 搜索功能的实践,主要包含以下几个方面的内容:
Kotlin Flow 是什么?以及如何使用?如何区分末端操作符还是中间操作符?Kotlin Channel 是什么?以及如何使用?Kotlin Channel 都有那几种类型?BroadcastChannels 是什么?以及如何在项目中使用?StateFlow 是什么?以及如何在项目中使用?Kotlin 常用操作符 debounce 、filter 、flatMapLatest 、 distinctUntilChanged 解析?之前有很多朋友跟我反馈,如何使用 Flow 实现搜索功能,所以我在 PokemonGo 项目中增加了两种搜索场景,分别演示 BroadcastChannels 和 StateFlow 的用法。
使用 ConflatedBroadcastChannel 实现 DB 搜索使用 StateFlow 实现 NetWork 搜索在分析这两种实现方式之前,需要先了解几个基本概念, Flow 和 Channel 是什么,以及常用的操作符 debounce 、filter 、flatMapLatest 、 distinctUntilChanged 等等的使用,Flow 和 Channel 是一个比较大的概念,后面我会花好几篇文章来分析它们,本文只会概述它们之间的区别。
先来看看 Kotlin 官方文档是如何介绍 Flow
将上面这段话,简单的总结一下:
Flow 是非阻塞的,以挂起的方式执行,只有遇到末端操作符,才会触发所有操作的执行所有操作都在相同的代码块内顺序执行发射出来的值都是顺序执行的,只有在某一时刻结束(遇到 末端操作符 或者出现异常)map , filter , take , zip 等等是中间操作符,collect , collectLatest , single , reduce , toList 等等末端操作符中间操作符构建了一个待执行的调用链,如下图所示:不阻塞,以挂起的方式执行 :也就是协程作用域被挂起, 当前线程中协程作用域之外的代码不会阻塞
接下来我们来看一段示例:
suspend fun printValue() = flow<Int> { for (index in 1..10) { emit(index) } }.map { it -> it * it } // map, filter, take, zip 等等是中间操作符 .filter { it -> it > 5 } .toList() // 只有遇到末端操作符 collect, collectLatest,single, reduce, toList 等等才会触发所有操作的执行 遇到中间操作符,并不会执行任何操作,也不会挂起函数本身,这些操作符构建了一个待执行的调用链末端操作符是可挂起函数,遇到末端操作符会触发所有操作的执行区分末端操作符还是中间操作符,可以按照是否是挂起函数来区分,我个人觉得按照挂起函数来区分,方便去记忆上面提到的 Flow 的几个特点,当然也可以按照其他方式来区分,我们一起来分析一下源码。
// 中间操作符是 Flow 的扩展函数,它们最后都是通过 emit 来发射数据 public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value -> if (predicate(value)) return@transform emit(value) } // 末端操作符是一个挂起函数 // 末端操作符无论是 collectLatest,single, reduce, toList 最后都是调用 collect public suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T> = toCollection(destination) public suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C { collect { value -> destination.add(value) } return destination } 中间操作符是 Flow 的扩展函数,它们最后都是通过 emit 来发射数据末端操作符是一个挂起函数末端操作符无论是 collectLatest , single , reduce , toList 最后都是调用 collect来看看 Kotlin 官方文档是如何介绍 Channel
将上面这段话,简单的总结一下:
Channel 是非阻塞的,它用于发送方 (SendChannel) 和接收方 (ReceiveChannel) 之间通信Channel 实现了 SendChannel 和 ReceiveChannel 接口,所以既可以发送数据又可以接受数据Channel 和 Java 中的 BlockingQueue 类似,不同之处在于 BlockingQueue 是阻塞的,而 Channel 是挂起的发送方 (SendChannel) 和接收方 (ReceiveChannel) 之间通过缓冲区进行同步的,如下图所示: 通过发送方 (SendChannel) 将数据发送到缓冲区通过接收方 (ReceiveChannel) 从缓冲区获取数据发送方 (SendChannel) 和 接收方 (ReceiveChannel) 之间有一个通道,也就是缓冲区缓冲区的作用帮我们同步发送方 (SendChannel) 和 接收方 (ReceiveChannel) 发送和接受的数据,也就意味着多个协程可以向同一个 channel 发送数据, 一个 channel 的数据也可以被多个协程接收我们来实现一个简易的消息发送和接受的例子:
val channel = Channel<Int>() // 接受消息 suspend fun receiveEvent() { coroutineScope { while (!channel.isClosedForReceive) { // receive()方法异步获取元素,如果缓冲区是空,receive() 调用者将被挂起,直到一个新值被发送到缓冲区 // receive() 是一个挂起函数,用于同步发送方和接收方的一种机制 channel.receive() // poll()方法同步获取一个元素,如果缓冲区是空的,则返回null // channel.poll() } } } // 发送消息 suspend fun postEvent() { coroutineScope { if (!channel.isClosedForSend) { (1..10).forEach { // 如果缓冲区没有满,则立即添加元素, // 如果缓冲区满了调用者会被挂起 // send() 是一个挂起函数,用于同步发送方和接收方的一种机制 channel.send(it) // offer():如果缓冲区存在并且没有满立即向缓冲区添加一个元素 // 如果添加成功会返回true, 失败会返回 false // channel.offer(it) } } } }正如你所看到的 发送 和 接受 都有两个方法,分别来分析一下他们的区别。
send() 和 offer() 的区别:
send(element: E) :如果缓冲区没有满,则立即添加元素, 如果缓冲区满了调用者会被挂起,send() 方法是一个挂起函数,用于同步发送方和接收方的一种机制offer(element: E): Boolean :如果缓冲区存在并且没有满立即向缓冲区添加一个元素,添加成功会返回 true, 失败会返回 falsereceive() 和 poll() 的区别:
receive(): E :异步获取元素,如果缓冲区是空时调用者会被挂起,直到一个新值被发送到缓冲区,receive() 方法是一个挂起函数,用于同步发送方和接收方的一种机制poll(): E?:用于同步获取一个元素,如果缓冲区是空的,则返回 nullFlow 与 Channel 的区别:
Flow :中间操作符 (map , filter 等等) 会构建了一个待执行的调用链,只有遇到末端操作符 (collect , toList 等等) 才会触发所有操作的执行,所以 Flow 也被称为冷数据流Channel :发送方 (SendChannel) 发送数据,并不依赖于接受方(ReceiveChannel),所以 Channel 也被称为冷数据流Channel 对应着有四种不同的类型:
RendezvousChannel :这是默认的类型,大小为 0 的缓冲区,只有当 send() 方法和 receive() 方法都调用的时候,元素才会从发送方传输到接收方,否则将会被挂起LinkedListChannel :会创建一个容量无限的缓冲区 (受限于内存的大小) ,send() 方法远不会挂起,offer() 方法始终返回 trueConflatedChannel :最多缓冲一个元素,新元素会覆盖掉旧元素,只会接收最后发送的元素,之前的元素都会丢失,send() 方法永远不会挂起,offer() 方法始终返回 trueUnlimitedChannel :会创建一个固定容量的数组缓冲区,send() 方法仅在缓冲区满时挂起,receive() 方法仅在缓冲区为空时挂起创建四种不同类型 channel 的方式:
val rendezvousChannel = Channel<Int>() val bufferedChannel = Channel<Int>(30) val conflatedChannel = Channel<Int>(Channel.Factory.CONFLATED) val unlimitedChannel = Channel<Int>(Channel.Factory.UNLIMITED)来看看 Kotlin 官方文档是如何介绍 BroadcastChannels
BroadcastChannels 是非阻塞的,它用于发送方 (SendChannel) 和接收方 (ReceiveChannel) 之间通信BroadcastChannels 实现了 SendChannel 接口,所以只可以发送数据BroadcastChannels 提供了 openSubscription 方法,会返回一个新的 ReceiveChannel,可以从缓冲区获取数据通过 BroadcastChannels 发送的数据,所有接收方 (ReceiveChannel) 都会收到,如下图所示BroadcastChannels 是一个接口,而它的子类有 ConflatedBroadcastChannel、ArrayBroadcastChannel,这里主要介绍一下 ConflatedBroadcastChannel,ConflatedBroadcastChannel 重写了 openSubscription 方法。
public override fun openSubscription(): ReceiveChannel<E> { val subscriber = Subscriber(this) ...... // 省略很多无关的代码 return subscriber } openSubscription 方法返回一个 ReceiveChannel 作为接受者在 openSubscription 方法内,创建了一个 Subscriber 的实例Subscriber 其实是 ConflatedBroadcastChannel 的内部类,它实现了 ReceiveChannel 接口。
private class Subscriber<E>( private val broadcastChannel: ConflatedBroadcastChannel<E> ) : ConflatedChannel<E>(), ReceiveChannel<E>正如你所见 Subscriber 继承 ConflatedChannel 同时实现了 ReceiveChannel 接口,而 ConflatedChannel 在上文介绍过了,最多缓冲一个元素,新元素会覆盖掉旧元素,只会接收最后发送的元素,之前的元素都会丢失,所以 ConflatedBroadcastChannel 适合用来实现搜索相关的功能,因为用户只对最后一次搜索结果感兴趣。
注意: StateFlow 将会取代 ConflatedBroadcastChannel 下文有介绍
我在 PokemonGo 项目中增加了两种搜索场景,分别通过 BroadcastChannels 和 StateFlow 来实现,通过 ConflatedBroadcastChannel 实现 DB 搜索,只需要两步
1.在 Activity 中监听 ConflatedBroadcastChannel 的变化 src/main/java/com/hi/dhl/pokemon/ui/main/MainActivity.kt
// searchView 是一个 AppCompatEditText,当然你可以使用 androidx.appcompat.widget.SearchView,或者其他 searchView.addTextChangedListener { val result = it.toString() // 调用 queryParamterForDb 方法过滤用户的输入,并查询数据库 mViewModel.queryParamterForDb(result) } // 监听查询结果 mViewModel.searchResultForDb.observe(this, Observer { mPokemonAdapter.submitData(lifecycle, it) }) 接受用户输入的数据,并调用 queryParamterForDb 方法过滤用户的输入,然后查询数据库通过 searchResultForDb.observe 方法监听查询结果2. 在 MainViewModel 中实现 queryParamterForDb 方法 src/main/java/com/hi/dhl/pokemon/ui/main/MainViewModel.kt
// 根据关键词搜索 fun queryParamterForDb(paramter: String) = mChanncel.offer(paramter) // 使用 ConflatedBroadcastChannel 进行搜索 val searchResultForDb = mChanncel.asFlow() // 避免在单位时间内,快输入造成大量的请求 .debounce(200) // 避免重复的搜索请求。假设正在搜索 dhl,用户删除了 l 然后输入 l。最后的结果还是 dhl。它就不会再执行搜索查询 dhl // distinctUntilChanged 对于 StateFlow 任何实例是没有效果的 .distinctUntilChanged() .flatMapLatest { search -> // 只显示最后一次搜索的结果,忽略之前的请求 pokemonRepository.fetchPokemonByParameter(search).cachedIn(viewModelScope) } .catch { throwable -> // 异常捕获 }.asLiveData() 通过 mChanncel.offer 发送数据通过 mChanncel.asFlow() 方法,将 Channel 转换为 Flow 并调用 debounce 、 distinctUntilChanged 、 flatMapLatest 过掉用户的输入数据,这些操作符在后文会详细分析最后查询数据库,返回结果,项目中使用的是通过 Paging3 查询本地数据库,关于如何实现可以查看另外一篇文章 Jetpack 成员 Paging3 数据实践以及源码分析(一)重点: 在 Kotlin coroutines library (1.3.6) 版本中增加了一个新类 StateFlow,它的设计和 ConflatedBroadcastChannel 相同,将来计划完全取代 ConflatedBroadcastChannel
在前面的内容提到了很多次 StateFlow,那么 StateFlow 是什么,它与 Flows 和 Channels 有什么关系呢,来看看 Kotlin 官方文档是如何介绍 StateFlow
将上面这段话,简单的总结一下:
StateFlow 实现了 Flow 接口,它仅仅表示一种可读的状态,它的值是不变的,用于外部调用
public interface StateFlow<out T> : Flow<T> { public val value: T // val 关键字表示不可变的 }StateFlow 提供了一个可变的版本 MutableStateFlow,它的值是可变的,用于内部调用
public interface MutableStateFlow<T> : StateFlow<T> { public override var value: T // var 表示可变的 }StateFlow 与 Flow 的不同之处在于,StateFlow 仅仅表示一种状态,不依赖于特定的上下文,而 Flow 操作执行是在 CoroutineScope 内的,换句话说 StateFlow 不需要在协程的作用域内,它也可以执行
刚才我们提到 StateFlow 的出现是为了取代 ConflatedBroadcastChannel,那么它与 ConflatedBroadcastChannel 有什么不同之处:
StateFlow 实现更加简单,不需要实现所有 Channel API,而 ConflatedBroadcastChannel 在其内部封装了 ConflatedChannel 和 BroadcastChannels
StateFlow 内部有个变量 value,无论任何时候都可以安全的访问
StateFlow 实现读写分离,StateFlow 用来读而 MutableStateFlow 用来写
StateFlow 内部使用 Any.equals 来比较新值与旧值,和 distinctUntilChanged 方式相同,所以在 StateFlow 上应用 distinctUntilChanged 是没有效果的
StateFlow 源码:
if (oldState == newState) return // 如果值没有改变,不会做任何事distinctUntilChanged 源码
public fun <T, K> Flow<T>.distinctUntilChangedBy(keySelector: (T) -> K): Flow<T> = distinctUntilChangedBy(keySelector = keySelector, areEquivalent = { old, new -> old == new })StateFlow 和 ConflatedBroadcastChannel 一样,实现搜索功能只需要两步
1.在 Activity 中监听 ConflatedBroadcastChannel 的变化 src/main/java/com/hi/dhl/pokemon/ui/main/MainActivity.kt
// searchView 是一个 AppCompatEditText,当然你可以使用 androidx.appcompat.widget.SearchView 或者其他 searchView.addTextChangedListener { val result = it.toString() // 调用 queryParamterForNetWork 方法过滤用户的输入,并查询网络 mViewModel.queryParamterForNetWork(result) } mViewModel.searchResultMockNetWork.observe(this, Observer { // 网络搜索回调监听 }) 接受用户输入的数据,并调用 queryParamterForNetWork 方法过滤用户的输入,通过网络查询关键字通过 searchResultMockNetWork.observe 方法监听查询结果2. 在 MainViewModel 中实现 queryParamterForNetWork 方法 src/main/java/com/hi/dhl/pokemon/ui/main/MainViewModel.kt
// 根据关键词搜索 fun queryParamterForNetWork(paramter: String) { _stateFlow.value = paramter } // 因为没有合适的搜索接口,在这里模拟进行网络搜索 val searchResultMockNetWork = // 避免在单位时间内,快输入造成大量的请求 stateFlow.debounce(200) .filter { result -> if (result.isEmpty()) { // 过滤掉空字符串等等无效输入 return@filter false } else { return@filter true } } .flatMapLatest { // 只显示最后一次搜索的结果,忽略之前的请求 // 网络请求,这里替换自己的实现即可 } .catch { throwable -> // 异常捕获 } .asLiveData() 通过 _stateFlow.value 更新数据调用 debounce 、filter 、flatMapLatest 等等操作符过滤掉无效的请求在 PokemonGo 项目中使用 debounce 、filter 、flatMapLatest 、 distinctUntilChanged 等等操作符,一起来详细的分析一下这些操作符的含义,以及如何使用。
debounce 也叫做防抖动函数,当用户在很短的时间内输入 “d”,“dh”,“dhl”,但是用户可能只对 “dhl” 的搜索结果感兴趣,因此我们必须舍弃 “d”,“dh” 过滤掉不需要的请求,针对于这个情况,我们可以使用 debounce 函数,在指定时间内出现多个字符串,debounce 始终只会发出最后一个字符串,我们来看个例子。
val result = flow { emit("h") emit("i") emit("d") delay(90) emit("dh") emit("dhl") }.debounce(200).toList() println(result) // 最后输出:dhlfilter 操作符用于过滤不需要的字符串,在 PokemonGo 项目中只过滤了空字符串,我们来看个例子。
val result = flow { emit("h") emit("i") emit("d") delay(90) emit("dh") emit("dhl") }.filter { result -> if (!result.equals("dhl")) { return@filter false } else { return@filter true } }.toList() println(result) // 最后输出:dhlflatMapLatest 避免向用户展示不需要的结果,只提供最后一个搜索查询(最新)的结果,例如,正在查询 “dh”,然后用户输入 “dhl”, 这个时候用户对 “dh” 的结果不感兴趣,可能只对 “dhl” 的结果感兴趣,这个时候可以使用 flatMapLatest,我们来看个例子。
flow { emit("dh") emit("dhl") }.flatMapLatest { value -> flow<String> { delay(100) println("collected $value") // 最后输出 collected dhl } }.collect()注意: flatMapLatest 在 Kotlin coroutines library (1.3.20) 以下版本使用会出现以下错误。
IllegalStateException crash: call to 'resume' before 'invoke' with coroutineKotlin 团队在 Kotlin coroutines library (1.3.20) 以上修复了这个问题,如果出现这个问题,将版本升级到 1.3.20 以上即可 issues 地址。
我们一起来分析 distinctUntilChanged 操作符源码是如何实现的
public fun <T> Flow<T>.distinctUntilChanged(): Flow<T> = when (this) { is StateFlow<*> -> this else -> distinctUntilChangedBy { it } } distinctUntilChanged 是 Flow 的扩展函数如果当前对象是 StateFlow,直接返回调用者本身如果不是 StateFlow 就会调用 distinctUntilChangedBy 方法 public fun <T, K> Flow<T>.distinctUntilChangedBy(keySelector: (T) -> K): Flow<T> = distinctUntilChangedBy(keySelector = keySelector, areEquivalent = { old, new -> old == new })最后会调用 areEquivalent 方法进行比较,会过滤掉所有相同值的
全文到这里就结束了,效果图如下所示,如果效果图无法查看,请点击这里查看 效果图
文章中提到的 PokemonGo(神奇宝贝) 是基于 Jetpack + MVVM + Data Mapper + Repository + Paging3 + App Startup + Hilt + Kotlin Flow + Motionlayout + Coil 等等技术综合实战项目,点击这里前往查看
致力于分享一系列 Android 系统源码、逆向分析、算法、译文、Kotlin、Jetpack 源码相关的文章,如果本篇文章对你有帮助,请帮我点个 star,感谢!!!,欢迎一起来学习,在技术的道路上一起前进。
在国庆期间我梳理了 LeetCode / 剑指 offer 及国内外大厂面试题解,截止到目前为止我已经在 LeetCode 上 AC 了 124+ 题,每题都会用 Java 和 kotlin 去实现,并且每题都有多种解法、解题思路、时间复杂度、空间复杂度分析,题库逐渐完善中,欢迎前去查看。
剑指 offer 及国内外大厂面试题解:在线阅读LeetCode 系列题解:在线阅读最后推荐我一直在更新维护的项目和网站:
计划建立一个最全、最新的 AndroidX Jetpack 相关组件的实战项目 以及 相关组件原理分析文章,正在逐渐增加 Jetpack 新成员,仓库持续更新,欢迎前去查看:AndroidX-Jetpack-Practice
LeetCode / 剑指 Offer / 国内外大厂面试题,涵盖: 多线程、数组、栈、队列、字符串、链表、树,查找算法、搜索算法、位运算、排序等等,每道题目都会用 Java 和 kotlin 去实现,仓库持续更新,欢迎前去查看 Leetcode-Solutions-with-Java-And-Kotlin,剑指 offer 及国内外大厂面试题解:在线阅读,LeetCode 系列题解:在线阅读
最新 Android 10 源码分析系列文章,了解系统源码,不仅有助于分析问题,在面试过程中,对我们也是非常有帮助的,仓库持续更新,欢迎前去查看 Android10-Source-Analysis
整理和翻译一系列精选国外的技术文章,每篇文章都会有译者思考部分,对原文的更加深入的解读,仓库持续更新,欢迎前去查看 Technical-Article-Translation
「为互联网人而设计,国内国外名站导航」涵括新闻、体育、生活、娱乐、设计、产品、运营、前端开发、Android 开发等等网址,欢迎前去查看 为互联网人而设计导航网站
