功能型网页设计,百度seo报价,不属于网络推广方法,网站开发公司排名前十简介 Kotlin的Flow 是 Kotlin 在异步编程方面的一个重要组件#xff0c;它提供了一种声明式的、可组合的、基于协程的异步编程模型。Flow 的设计灵感来自于 Reactive Streams、RxJava、Flux 和其他异步编程库#xff0c;但它与 Kotlin 协程无缝集成#xff0c;并提供了一种更…
简介 Kotlin的Flow 是 Kotlin 在异步编程方面的一个重要组件它提供了一种声明式的、可组合的、基于协程的异步编程模型。Flow 的设计灵感来自于 Reactive Streams、RxJava、Flux 和其他异步编程库但它与 Kotlin 协程无缝集成并提供了一种更具 Kotlin 特性的 API。
Flow 是用于异步数据流的概念它可以看作是一系列异步的、可并发的值或事件的流。可以通过操作符链更改、过滤、转换这些流并且可以对这些流进行组合、合并、扁平化等操作。Flow 中的数据可以是异步的、非阻塞的也可以是懒加载的因此它非常适合处理类似于网络请求、数据库查询、传感器数据等异步任务。
使用示例
fun main() runBlocking {flow {// 上游发源地emit(111) //挂起函数emit(222)emit(333)emit(444)emit(555)}.filter { it 200 } //过滤.map { it * 2 } //转换.take(2) //从流中获取前 2 个元素.collect {println(it)}
}//对应输出
444
666Process finished with exit code 0看到这种链式调用是不是感觉很熟悉没错同RxJava一样Kotlin的Flow也分为上游和下游emm… 也可以理解Kotlin的Flow就是为了替代RxJava如果对RxJava有一定了解的话学习Flow就更容易了。
应用场景
异步任务处理Flow 可以方便地处理异步任务如网络请求、数据库查询等。UI 事件响应Flow 可以用于处理 UI 事件例如按钮点击、搜索操作等。数据流管道Flow 可以作为数据处理的管道从一个数据源源源不断地发射数据供后续处理或展示。数据流转换Flow 可以方便地对数据进行转换、过滤、分组等操作实现复杂的数据流处理逻辑。
下面来看一些常见的Flow使用示例感受下Flow的魅力
Flow 提供了各种操作符用于转换、过滤和组合流例如 map()、filter()、transform()、zip()、flatMapConcat() 等。这些操作符可以通过链式调用来对流进行链式操作。
Flow 还具有背压支持可以通过 buffer()、conflate()、collectLatest() 等操作符来控制流的发送速率从而避免生产者和消费者之间的资源不平衡问题。
1. 转换操作
map(): 将 Flow 中的每个元素转换为另一种类型。
fun createFlow(): FlowInt flow {for (i in 1..5) {delay(1000)emit(i)}
}fun main() runBlocking {createFlow().map { it * it } // 将元素平方.collect { value -println(value) // 打印平方后的值}
}
//对应输出
1
4
9
16
25Process finished with exit code 0take() 函数有以下几种重载形式
take(n: Int): 从流中获取前 n 个元素。takeWhile(predicate: (T) - Boolean): 获取满足条件的元素直到遇到第一个不满足条件的元素。takeLast(n: Int): 从流的末尾获取最后 n 个元素。
filter(): 根据给定的谓词函数过滤 Flow 中的元素。
fun createFlow(): FlowInt flow {for (i in 1..5) {delay(1000)emit(i)}
}fun main() runBlocking {createFlow().filter { it % 2 0 } // 过滤偶数.collect { value -println(value) // 打印偶数}
}
//对应输出
2
4Process finished with exit code 02. 组合操作
zip(): 将两个 Flow 的元素一对一地组合在一起。
fun createFlowA(): FlowInt flow {for (i in 1..5) {delay(1000)emit(i)}
}fun createFlowB(): FlowString flow {for (i in 5 downTo 1) {delay(1000)emit(Item $i)}
}fun main() runBlocking {createFlowA().zip(createFlowB()) { a, b - $a - $b } // 组合 FlowA 和 FlowB 的元素.collect { value -println(value) // 打印组合后的元素}
}
//对应输出
1 - Item 5
2 - Item 4
3 - Item 3
4 - Item 2
5 - Item 1Process finished with exit code 0flatMapConcat(): 将 Flow 中的元素扁平化为多个 Flow并按顺序连接起来。
fun createFlowOfList(): FlowListInt flow {for (i in 1..3) {delay(1000)emit(List(i) { it * it }) // 发出包含整数平方的列表}
}fun main() runBlocking {createFlowOfList().flatMapConcat { flowOfList - flowOfList.asFlow() } // 扁平化列表中的元素.collect { value -println(value) // 打印平方后的值}
}
//对应输出
0
0
1
0
1
4Process finished with exit code 0解释下为什么是这样的打印结果因为上面发送了三个FlowListInt第一个List元素个
数为1所以打印 索引的平方即只有一个元素 下表索引就是0输出打印0的平方还是0第二个List
元素个数为2返回索引下标01扁平化List后打印 01。以此类推...除了使用 flow{} 创建 Flow 以外还可以使用 flowOf() 这个函数
fun main() runBlocking {flowOf(1, 2, 3, 4, 5).collect { value -println(value) // 打印 Flow 中的元素}
}
//对应输出
1
2
3
4
5Process finished with exit code 0flowOf 函数是用于快速创建 Flow 的便捷方式。它接受可变数量的参数并将这些参数作为发射项放入到 Flow 中。这样我们就可以直接在 flowOf 中指定要发射的元素而无需使用流构建器 flow { }。
在某些场景下我们甚至可以把 Flow 当做集合来使用或者反过来把集合当做 Flow 来用。
Flow.toList(): toList() 是 Kotlin Flow 中的一个终端操作符。它用于将 Flow 中的元素收集到一个列表中并在该列表中返回。它将 Flow 中的所有元素收集起来然后在流完成时返回一个包含所有元素的列表。
以下是 toList() 的示例用法
fun createFlow(): FlowInt flow {for (i in 1..5) {delay(1000)emit(i)}
}fun main() runBlocking {val list: ListInt createFlow().toList() // 将 Flow 中的元素收集到列表中println(list) // 打印列表
}
//对应输出
[1, 2, 3, 4, 5]Process finished with exit code 0需要注意toList() 操作符会等待整个流完成然后将所有元素收集到列表中。因此如果 Flow 是一个无限流则可能永远不会完成或者在内存和计算资源耗尽之前无法完成。 List.asFlow(): asFlow() 是 Kotlin 标准库中 List 类的扩展函数用于将 List 转换为 Flow。它允许将 List 中的元素作为发射项逐个发送到 Flow 中。
以下是 asFlow() 的示例用法
fun main() runBlocking {val list listOf(1, 2, 3, 4, 5)list.asFlow() // 将 List 转换为 Flow.collect { value -println(value) // 打印 Flow 中的元素}
}
//对应输出
1
2
3
4
5Process finished with exit code 0asFlow() 的作用是将其他具有迭代性质的数据结构如 List、Array 等转换为 Flow以便能够使用 Flow 的操作符和函数来处理这些数据。这对于在流式数据处理中与现有数据结构进行集成非常有用。
值得注意的是使用 asFlow() 转换的 Flow 在发送元素时遵循迭代器的顺序。也就是说Flow 发射的元素的顺序与原始数据结构例如 List中的元素顺序相同。
到目前为止可知的三种创建 Flow 的方式:
Flow创建方式适用场景用法flow{}未知数据集flow { emit(getLock()) }flowOf()已知具体的数据flow(1,2,3)asFlow()数据集合list.asFlow()
由上面代码示例可以看出Flow的API总体分为三部分上游、中间操作、下游上游发送数据下游接收 处理数据其中最复杂的就是中间操作符下面就详细介绍下Flow的中间操作符。
中间操作符
生命周期
在学习中间操作符之前先了解下Flow的生命周期
创建流Flow creation通过使用 flow { ... } 构建器或其他 Flow 构建器创建一个流。在此阶段流是冷的不会发射任何值。收集流Flow collection通过调用 collect 函数或其他流收集操作符如 toList、first、reduce 等来收集流的值。在此阶段流会开始发射值并触发相关的操作。流完成Flow completion当发射的所有值都被消费后流会完成并标记为完成状态。此时流的生命周期结束。取消流Flow cancellation如果收集流的代码块被取消使用协程的 cancel 函数或者流的收集器被销毁如Activity 或 Fragment 被销毁则流的收集过程将被取消。
需要注意的是Flow 是基于协程的因此其生命周期与协程的生命周期密切相关。当协程被取消时与该协程相关联的流收集也会被取消所以在使用Flow封装网络请求时如果想取消某个请求即把相应的协程取消即可。
先来看下onStart、onCompletion
fun main() runBlocking {flow {emit(1)emit(2)emit(3)}.onStart {println(Flow started emitting values)}.onCompletion {println(Flow completed)}.collect { value -println(Received value: $value)}
}
//对应输出
Flow started emitting values
Received value: 1
Received value: 2
Received value: 3
Flow completedProcess finished with exit code 0onStart 函数允许在 Flow 开始发射元素之前执行一些操作包括添加日志、初始化操作等。 onCompletion 函数允许在 Flow 完成之后执行一些操作包括资源清理、收尾操作等。
并且onCompletion{} 在面对以下三种情况时都会进行回调
正常执行完毕出现异常被取消
异常处理
Flow中的catch操作符用于捕获流中的异常考虑到 Flow 具有上下游的特性catch 这个操作符的作用是和它的位置强相关的即只能捕获到上游异常而无法捕获到下游异常在使用时注意cache的位置。
看一段示例
fun main() runBlocking {flow {emit(1)emit(2)throw NullPointerException(Null error)emit(3)}.onStart {println(Flow started emitting values)}.catch {println(Flow catch)emit(-1)}.onCompletion {println(Flow completed)}.collect { value -println(Received value: $value)}
}
//对应输出
Flow started emitting values
Received value: 1
Received value: 2
Flow catch
Received value: -1
Flow completedProcess finished with exit code 0
需要注意catch和onCompletion 的执行顺序和其所处的位置有关出现异常时谁在上游谁先执行。
上下文切换
Flow 非常适合复杂的异步任务。在大部分的异步任务当中我们都需要频繁切换工作的线程。对于耗时任务我们需要线程池当中执行对于 UI 任务我们需要在主线程执行。
flowOn可以完美往我们解决这一问题
fun main() runBlocking {flow {emit(1)println(emit 1 in thread ${Thread.currentThread().name})emit(2)println(emit 2 in thread ${Thread.currentThread().name})emit(3)println(emit 3 in thread ${Thread.currentThread().name})}.flowOn(Dispatchers.IO).collect {println(Collected $it in thread ${Thread.currentThread().name})}
}
//对应输出
emit 1 in thread DefaultDispatcher-worker-2
emit 2 in thread DefaultDispatcher-worker-2
emit 3 in thread DefaultDispatcher-worker-2
Collected 1 in thread main
Collected 2 in thread main
Collected 3 in thread mainProcess finished with exit code 0默认不使用flowOn的情况下Flow中所有代码都是执行在主线程调度器上的当使用flowOn切换上下文环境后flowOn 上游代码将执行在其所指定的上下文环境中同cache操作符一样flowOn也与其位置是强相关的。
launchIn 用于启动流的收集操作的操作符
launchIn 操作符的语法如下
flow.launchIn(scope)其中flow 是待收集的流scope 是用于启动流收集的协程作用域。
下面通过两段示例感受下launchIn的作用 示例1
fun main() runBlocking {val flow flow {emit(1)emit(2)emit(3)}val job launch(Dispatchers.Default) {flow.collect { value -println(Collecting $value in thread ${Thread.currentThread().name})}}delay(1000)job.cancel()
}示例2:
fun main() runBlocking(Dispatchers.Default) {val flow flow {emit(1)emit(2)emit(3)}flow.flowOn(Dispatchers.IO).onEach {println(Flow onEach $it in thread ${Thread.currentThread().name})}.launchIn(this)delay(1000)
}launchIn源码
public fun T FlowT.launchIn(scope: CoroutineScope): Job scope.launch {collect() // tail-call
}上面代码中的onEach操作符的作用是对流中的每个元素进行处理而不改变流中的元素。它类似于其他编程语言中的 forEach 或 map 操作但 onEach 不会返回修改后的流而是继续返回原始流。
由于launchIn调用了collect(),所以它也是一个终止操作符上面两种方法都可以切换collect中的上下文环境看起来是感觉怪怪的哈因为是在协程作用域中使用withContext{}不更方便吗不过launchIn更大的作用是让其他操作符如 collect{}、filter{}等都运行在指定上下文环境中。
如果上面两个示例不好理解在看下这两个
示例1
fun main() runBlocking {val scope CoroutineScope(Dispatchers.IO)val flow flow {emit(1)println(Flow emit 1 in thread ${Thread.currentThread().name})emit(2)println(Flow emit 2 in thread ${Thread.currentThread().name})emit(3)println(Flow emit 3 in thread ${Thread.currentThread().name})}flow.filter {println(Flow filter in thread ${Thread.currentThread().name})it 1}.onEach {println(Flow onEach $it in thread ${Thread.currentThread().name})}.collect()delay(1000)
}
//对应输出
Flow filter in thread main
Flow emit 1 in thread main
Flow filter in thread main
Flow onEach 2 in thread main
Flow emit 2 in thread main
Flow filter in thread main
Flow onEach 3 in thread main
Flow emit 3 in thread mainProcess finished with exit code 0示例2
//只是把上面collect 换成了launchIn(scope)flow.filter {println(Flow filter in thread ${Thread.currentThread().name})it 1}.onEach {println(Flow onEach $it in thread ${Thread.currentThread().name})}.launchIn(scope)
//对应输出
Flow filter in thread DefaultDispatcher-worker-1
Flow emit 1 in thread DefaultDispatcher-worker-1
Flow filter in thread DefaultDispatcher-worker-1
Flow onEach 2 in thread DefaultDispatcher-worker-1
Flow emit 2 in thread DefaultDispatcher-worker-1
Flow filter in thread DefaultDispatcher-worker-1
Flow onEach 3 in thread DefaultDispatcher-worker-1
Flow emit 3 in thread DefaultDispatcher-worker-1Process finished with exit code 0这就一目了然了吧 需要注意由于 Flow 当中直接使用 withContext 是很容易引发其他问题的因此withContext 在 Flow 当中是不被推荐的即使要用也应该谨慎再谨慎。 终止操作符
Flow中的终止操作符包括下面几种
collect: 收集流中的元素并执行相应操作。toList, toSet: 将流收集为列表或集合。reduce, fold: 使用给定的累加器函数将流中的元素合并为单个值。
需要注意终止操作符后面不能再点出来其他操作符只能是Flow的最后一个操作符
Flow为什么被称为 “冷” 的与Channel 的区别是什么
Flow 被称为“冷”的主要原因是它是一种惰性的数据流。冷流意味着当没有收集者订阅该流时它是不会产生任何数据的。Flow 的执行是由收集者的需求来驱动的只有当有一个或多个收集者订阅了 Flow并调用了 collect 等收集操作时Flow 才会开始发射数据。
Flow与Channel 特性 Flow 是惰性的数据流Flow 是一种基于协程的异步数据流处理库在 Kotlin 中引入了响应式编程的思想。与其他响应式流框架如 RxJava相比Flow 是惰性的只有在有收集者订阅时才会开始发射数据。这使得 Flow 非常适合处理潜在的无限序列或需要异步处理的大量数据。 Channel 是热的通道Channel 是 Kotlin 中用于协程之间进行通信和协同工作的机制。与 Flow 不同Channel 是热的即使没有接收者它仍会持续发射数据。它可以用于多个协程之间传递数据、进行异步消息传递等情况。
区别 Flow 是基于被动订阅的模型数据的发射是由收集者的需求驱动的。每个收集者独立地订阅 Flow可以按自己的节奏处理数据。 Channel 是主动推送数据的模型数据的发送和接收是显式进行的。发送者可以将数据放入 Channel而接收者通过调用 Channel 的receive()函数主动获取数据。
适用场景 Flow 适合处理异步数据流例如网络请求结果、数据库查询结果等。它提供了各种操作符如 map、filter、transform 等来转换和处理数据流同时支持背压backpressure处理以避免生产者与消费者之间的压力失衡。 Channel 适合多个协程之间的通信和协同工作。它允许协程之间异步地发送和接收数据可以用于实现生产者-消费者模型、事件驱动模型等。 感谢朱涛 · Kotlin 编程第一课
由于是初学者对协程方面见解不深如有描述错误的地方欢迎批评指正不吝赐教