Kotlin协程-从一到多

上一篇文章,我介绍了Kotlin协程的创建,使用,协作等内容。本篇将引入更多的使用场景,继续带你走进协程世界。

使用协程处理异步数据流

常用编程语言都会内置对同一类型不同对象的数据集表示,我们通常称之为容器类。不同的容器类适用于不同的使用场景。Kotlin的Flow就是在异步计算的需求下引入的,用于表示异步的数据流。

Flow

“问渠哪得清如许,为有源头活水来”,异步数据流的基本就是以某种方式获得异步数据。Kotlin提供了多种种方式,比较常用的就是Kotlin协程包的asFlow扩展和flow构造器。前者是对普通数据集的Flow化封装,没有更多可言,我们着重来看后者。
flow构造器的主要目标就是产生一个异步数据流,它是一个泛型函数,参数是一个挂起函数,并且是FlowCollector是扩展函数。这个接口只有一个emit方法,就是为创建的Flow提供异步计算的数据的,因为它是挂起函数,所以我们能在里面使用其他挂起函数计算异步值,然后通过emit方法将值发送出去,如此反复就能为下游操作提供源源不断的数据流了。
事情还没完,上面的步骤我们只是规定了创建数据的方式,并没有真正执行,也就是建好了道路,但是还没有车上路。那么,怎样才能让车在路上跑呢,查看Flow的接口会发现,它提供了collect方法来处理数据。collect接收一个挂起函数作为处理逻辑,但是同时,collect方法本身也是挂起函数,所以,这个方法只能在挂起函数中运行。有了这些知识,我们就可以写出最简单的异步数据流了。

 1uspend fun compute():Int{
             delay(123)
             return 1024
 }
 
 viewModelScope.launch {
     val flow=flow<Int> {
         emit(9527)
         emit(compute())
        delay(256)
        emit(256)
    }
    flow.collect {
        println(it)
    }
}

flow构造器里面随意做各种操作,只要在必要的时候传递结果就行了,但是需要注意的是,emit方法只能运行在同一个协程里。乍一看,这样分开写和写在一起并没有本质上的差别,但Flow还能做到更多。

该给Flow换个工作环境了

上一节,我们那个简单的示例,假如把构造器里面的数据获取方法换成网络请求,应用就歇菜了。因为它们都是运行在主线程里面的。那么这个时候,看过上一篇文章的小伙伴马上就会反应过来,用withContext方法在构造器里面切换线程就行了哇。思路是很对,因为Flow的默认配置就是构造器和collect方法工作在同一线程,既然现在主线程不让运行,那就把构造器的线程切换一下就行了呗。然后事实并不是这样,这样写出来的代码根本无法运行。因为官方提供了唯一的flowOn方法来切换构造器的执行线程。使用也很简单,就是对创建好的Flow对象配置一次flowOn方法就行了。

val flow=["1.jpg","2.jpg"].asFlow()
flow.map { decode(it) }
        .flowOn(Dispatchers.IO)
viewModelScope.launch {
    flow.collect{
        adapter.add(it)
    }

有些中间处理逻辑

熟悉RxJava的小伙伴可能有疑问了,这些操作RxJava也能完成,甚至还有更多的操作符来支持中间状态的处理,那么异步数据流能做到这些吗。毫无疑问,它可以。普通的数据集有map,filter等操作方法,对于异步数据流来说,这些方法同样适用。而且这些方法参数都是挂起函数,都可以执行异步操作。而且它还有个更灵活的transform方法,这个方法可以定制自己的操作符,实现更灵活的数据操作。

当然,上面那些操作符都只能实现单一异步流的操作,对于多数据流的支持,它也同样不在话下。zip可以将两个两个数据源两两合并起来,合成的数据流长度为两个数据流中最短的那个数据流的长度。combine则与zip不同,它会将两个数据流最近的发送数据作为输入,也就是说,假如一块一慢的两个数据源,慢的数据源的元素可能会被多次取到,从而最终的数据流比最短的那个都长。

val flow = flowOf(1, 2).delayEach(10)
val flow2 = flowOf("a", "b", "c").delayEach(15)
flow.combine(flow2) { i, s -> i.toString() + s }.collect {
     println(it) // Will print "1a 2a 2b 2c"
}

结束状态跟踪

上一节提到,由于数据源和处理逻辑不在同一个地方,所以很难确定最终的数据流大小,进而不知道数据流什么时候处理结束。而且中间操作也可能会改变数据流的大小,由此就更加难以确定数据处理结束的时机了。但是我们有的时候却需要在数据处理完成后做一些操作,该怎么办呢?这个时候当然是该onCompletion方法上场了。这个方法有一个可为空的Throwable类型参数,很显然,这可以同时指示两种处理结果,成功或者失败,失败就会将异常对象传递进来。

多个协程共同工作

很多时候,避免不了让多个协程共同工作。对于返回单个值的协程,上一篇我们也提到过了,可以传递async构造器的返回对象Deferred,但是局限性就是这个对象只能传递一个值。针对多值传递的情况,Kotlin提供了Channel的解决方法。Channel类似于阻塞队列,数据通过send方法发送出去,在另外的地方使用receive方法接收。通过这种方法,我们可以极大提供协程的工作效率。利用它就可以轻松实现生产者和消费者模型。

 val chanel=Channel<Int>()
 viewModelScope.launch(Dispatchers.IO) {
     for (i in 1..5){
         delay(1000)
         chanel.send(i)
     }
 }
 viewModelScope.launch { 
     for (i in chanel){
        println("Handle ${i}")
    }
}

当然,这只是最简单的用法,还可以加入更多的生产者,或者不再需要数据时取消,甚至还有专门的product构造器,直接获得返回多个值的协程对象。

总结

Kotlin协程有很多有用的API,这些API覆盖了大部分异步使用的场景。所以在使用协程的时候,我们首先需要明确使用场景,再根据使用场景确定使用哪一套API,这可以使我们避免陷入API恐惧症。为此,我根据这两篇文章的内容,整理出了一份情景表格,实际开发中可以参照使用。
Kotlin协程构造器

API 使用场景
launch 执行耗时操作,不需要返回值
async 需要获取耗时操作的单个返回值
produce 需要获取耗时操作的多个返回值

Kotlin协程协同工具

API 使用场景
Flow 操作异步数据流
Channel 协程间通信

青山不改,绿水长流,咱们下期见!

热门相关:冉冉心动