Kotlin下Rxjava的基础用法及流式调用示例详解

前言

万事开头难,写文章也是,现在越来越不知道开头怎么写了,所以在前言中,简单介绍下RxJava吧,第一次听说还是以前做Android开发的时候,那时候好多库中都使用了Rxjava,而在网络请求中,也有很多都是使用Rxjava去写,但自己却没怎么在项目中写过,而在搜索资料中发现,微信中搜rxjava时,最多介绍他的还是Android开发者,所以今天来记录下。

而所谓的响应式编程,就是一种用于应用程序异步编程的技术,他是一个通用的思想,类似与AOP,不只是在java中才有。他专注于对数据的变化做出反应,例如,有一个数据源(这里被称为生产者),一个数据目标(这里被成为消费者),然后在将消费者连接到订阅者之后,响应式编程框架负责将生产者生产的数据推送给消费者,一个可观察对象可以有任意数量的订阅者。

而对于一些思想上的框架,类似于Spring,源码上大体还是比较难的,毕竟就算是人,在思想上跨越也是有难度的,但对于RxJava来说,源码也不是很多,所以在以后会尝试介绍他的源码实现,而使用Rxjava的好处不是在于实现了什么具体的技术功能,比如使用CGLIB可以实现动态代理的技术,使用JDBC可以进行数据查询,而没有rxjava,我们的代码还可以借助Java8的Stream、CompletableFuture来实现。

而rxjava的好处在于让代码更简洁、优雅,通过他的链式调用,消除嵌套等。

在下面的例子中,我们会使用Kotlin来做示范。

基础用法

在这里,Observable 字面意思是可观察者,他表示数据源,通常,一旦订阅者开始收听,他们就会开始提供数据,而just表示仅仅,仅仅生产的数据是一个"T",即泛型类型,在这里是String。

而subscribe表示订阅,当订阅后,他会收到Observable生产的数据,来消费。

fun main() {
 Observable.just("hello rxjava").subscribe {
 println(it)
 }
}
输出:
hello rxjava

fromXXX

而上面说到,just表示仅仅,在rxjava中,不仅仅是具体的数据,还可以是Callable、Array、Future对象等,详细可以看fromXXX等方法,最终的结果由rxjava调用后如Callable的结果后,传递给订阅者。

fun main() {
 Observable.fromCallable {
 println("callable")
 "hello rxjava"
 }.subscribe {
 println(it)
 }
}

create

这个方法给我了我们手动执行的能力,即传递数据到订阅者是我们手动执行的。

fun main() {
 Observable.create<String> {
 it.onNext("hello")
 it.onError(IllegalArgumentException("错误"))
 it.onComplete()
 }.subscribe ({
 println(it)
 },{
 println(it.message)
 },{
 println("完成")
 })
}

interval & timer

还可以通过interval实现固定间隔定时。

fun main() {
 val observable = Observable.interval(1, TimeUnit.SECONDS)
 observable.subscribe {
 println(it)
 }
 observable.subscribe {
 println(it)
 Thread.sleep(2000)
 }
 Thread.sleep(100000);
}

而timer方法则是延迟N时间后,发送数据到订阅者.

fun main() {
 val observable = Observable.timer(2, TimeUnit.SECONDS)
 observable.subscribe {
 println(it)
 }
 observable.subscribe {
 println(it)
 Thread.sleep(2000)
 }
 Thread.sleep(100000);
}

指定线程

而使用上面方法有一个好处,即生产者可以在子线程中完成,而实际消费的时候在主线程,这在Android可谓是一种福利,如下。

fun main() {
 val threadPool = Executors.newCachedThreadPool()
 val anyFuture = threadPool.submit(Callable {
 Thread.sleep(2000)
 "hello"
 })
 Observable.fromFuture(anyFuture).subscribe {
 println(it)
 }
}

而如果担心等待时间问题,可是使用第二个重载方法,指定一个超时时间,而subscribe还有两个主要参数我们没说,一个是error发生错误时回调,一个是complete完成时回调,但在发生错误后,complete是不会回调的。

fun main() {
 val threadPool = Executors.newCachedThreadPool()
 val anyFuture = threadPool.submit(Callable {
 Thread.sleep(2000)
 "hello"
 })
 Observable.fromFuture(anyFuture,1,TimeUnit.SECONDS).subscribe({
 println(it)
 },{
 println("错误")
 },{
 println("完成")
 })
}

observeOn & subscribeOn

但你以为这就结束了吗,不,rxjava提供了丰富的线程切换,observeOn & subscribeOn这两个方法就是用来指定在哪里运行,Schedulers.newThread()表示在新线程,但rxjava实现的线程中,是守护线程,也就是当主线程退出后,他们也会自动退出,而在下面的例子中,如果在最后不加sleep,会导致主线程退出后,rxjava的所有线程在可能没执行完成后也将退出。

fun main() {
 Observable.create<String> {
 println(Thread.currentThread().isDaemon)
 it.onNext("hello")
 }
 .observeOn(Schedulers.newThread())
 .subscribeOn(Schedulers.newThread())
 .subscribe {
 println(Thread.currentThread().name)
 println(it)
 }
 Thread.sleep(10000)
}

而如果想自定义线程,也是支持的。

fun createSchedulers(): Scheduler {
 return Schedulers.from {
 thread { it.run() }
 }
}
fun main() {
 Observable.create<String> {
 it.onNext("hello")
 }
 .observeOn(createSchedulers())
 .subscribeOn(Schedulers.newThread())
 .subscribe {
 println(Thread.currentThread().name)
 println(it)
 }
}

Flowable

Flowable可以看成Observable新的实现,他支持背压,而他的API和Observable相似,在最后会介绍背压。

流式调用

我们已经熟悉了Java Stream的好处,所以在这里简单看下rxjava的实现,用法都一样,如下,创建集合"a","b","c","d"

  • map将所有item前添加字符"1"。
  • filter将b结尾的数据过滤掉。
  • skip忽略前n个数据。
fun main() {
 Flowable.fromIterable(mutableListOf("a","b","c","d"))
 .map { "1${it}" }
 .filter { !it.endsWith("b") }
 .skip(1)
 .subscribe {
 println(it)
 }
}

所以最后收到的消息将是 1c、1d

当然他提供的这类API非常之多,就不介绍了。

背压

背压指的是遇到被观察者发送的消息太快,至于它的订阅者不能及时处理数据,而我们可以提供一种告诉被观察者遇到这种情况的策略。

这种场景有个前提条件,被观察者和订阅者在不同线程。

背压策略被定义在BackpressureStrategy,有五种。

MISSING

通过create方法创建的Flowable没有指定背压策略,不会对通过OnNext发送的数据做缓存或丢弃,需要下游通过背压操作符制定策略。

ERROR

如果缓存池数据超限,则抛出异常。

BUFFER

可以无限制添加数据。

DROP

如果缓存池满了,则丢弃。

LATEST

仅保留最新的onNext值,如果下游无法跟上,则覆盖之前的值。

如下,我们使用BUFFER策略,默认的缓存池大小是128,可以通过System.setProperty("rx3.buffer-size","5")指定,而这个策略会导致只有缓存池不满的情况下,才会生产数据并发送给订阅者。

fun main() {
 System.setProperty("rx3.buffer-size","5")
 Observable.interval(1,TimeUnit.MILLISECONDS)
 .toFlowable(BackpressureStrategy.BUFFER)
 .map { User(1) }
 .observeOn(Schedulers.newThread())
 .subscribe {
 Thread.sleep(1000)
 println("hander $it")
 }
 Thread.sleep(100000)
}

而如果我们改成DROP,那么最终只有5条数据被消费,其他全部丢弃。

fun main() {
 System.setProperty("rx3.buffer-size","5")
 Observable.range(1,999)
 .toFlowable(BackpressureStrategy.DROP)
 .map { User(1) }
 .observeOn(Schedulers.newThread())
 .subscribe {
 Thread.sleep(1000)
 println("hander $it")
 }
 Thread.sleep(100000)
}

其他就不做demo了。

作者:i听风逝夜

%s 个评论

要回复文章请先登录注册