浅谈RxJava与2.0的新特性

简介

说起 RxJava ,相信诸多 Android 开发者都不会陌生。作为一个知名的响应式编程库,从前年开始逐渐变得火热,从小众到被众多 Android 开发者们广泛引入与流传,其在 GitHub 的 仓库 截止笔者写这篇文章时,已经有16400+个
star 。甚至有一些大牛专门为 Android 写了 RxJava 的适配库,如

为什么 RxJava 如此受到 Android 开发者们的欢迎。我想不外乎两个原因。 1. 异步 2. 链式操作

异步

对 Android 线程有所了解的朋友都知道, Android的 UI 绘制 与 事件响应是在主线程的,为了保证界面的流畅性,很多耗时操作如读写数据库、读写文件、请求网络,我们都会挪到异步线程去完成,再回调到主线程。当然在4.0以后主线程直接就不允许请求网络了。

在过去没有 RxJava 的时候,开发者一般都是通过 AsyncTask , Thread ,更好些的就是通过线程池来完成这些任务。而有了 RxJava 以后,简简单单的一句话就可以随意的切换线程,简直不用太舒服。

最典型的 RxJava 中的 Observable 类,提供了2个函数, 分别是 subscribeOn 与observeOn 。前者可以切换被观察时的线程(如果说数据发射的线程不够严谨,数据并非一定在观察时发射的,尤其是开发者自定义 OnSubscribe 时),后者可以切换数据被消费时的线程。

举一个切换线程的例子:

Log.i("debug", Thread.currentThread().getName());
Observable.empty()
        .doOnCompleted(new Action0() {
            @Override
            public void call() {
                Log.i("debug", Thread.currentThread().getName());
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnCompleted(new Action0() {
            @Override
            public void call() {
                Log.i("debug", Thread.currentThread().getName());
            }
        })
        .subscribe();

这里我们没有任何数据,就仅仅发射了一个 onComplete ,但是在切换线程的代码中,我们增加了 onComplte 时要额外执行的代码,输出结果如下:

08-27 10:47:41.173 6741-6741/com.dieyidezui.rxjavademo I/debug: main
08-27 10:47:41.201 6741-6762/com.dieyidezui.rxjavademo I/debug: RxIoScheduler-2
08-27 10:47:41.217 6741-6741/com.dieyidezui.rxjavademo I/debug: main

这仅仅是简单的例子, RxJava 提供了很多便捷的操作符供我们使用,如 map 、 filter 、 flatMap 、 merge 、 concat 等。可见当熟练使用后对我们的编程效率确实有很大帮助。尤其是
MVP 模式, RxJava 与之结合可谓是”天作之合”。

链式操作

上面笔者演示的代码其实就是 RxJava 的典型使用方式:

  1. 发射数据源
  2. 中间操作
  3. 处理结果

其中中间操作包含诸多用法, 如果切换线程,变换数据等。

为什么我说链式操作很好。第一,链式逻辑替代深度回调逻辑,容易编写,不易出 BUG 。第二,RxJava 提供诸多了整体处理数据的操作符,非常实用。第三,配合 Java8 的 lambda 表达式,使代码简短优雅。

好了,对 RxJava 的介绍就此为止了。进阶用法、原理剖析以后会有专门的文章。对 RxJava 不熟悉的同学,建议先去看一下官方的 wiki 。链接: https://github.com/ReactiveX/RxJava/wiki

RxJava2.0

前天, RxJava终于发布了2.0 RC1 版本,一直关注于此的笔者立刻就进去尝鲜了。结合官方的介绍,笔者总结并翻译了一些与 1.x 的异同与大家分享。

包名与MAVEN依赖

首先要说的就是 RxJava 2和1是 互相独立 的。因此包名与 maven 的依赖也是不一样的,就类似于 OkHttp 3与2一样。 RxJava 2.x的依赖是全新的 io.reactivex.rxjava2:rxjava:2.x.y ,并且类处于该 io.reactivex 包名下,而不再是 rx 。

接口变化

RxJava2 是遵循 Reactive
Streams Specification
 的规范完成的,新的特性依赖其提供的4个基础接口。分别是

  • Publisher
  • Subscriber
  • Subscription
  • Processor

Flowable与Observable

新的实现叫做 Flowable , 同时旧的 Observable 也保留了。因为在
RxJava1.x 中,有很多事件不被能正确的背压,从而抛出 MissingBackpressureException 。

举个简单的例子,在 RxJava1.x 中的 observeOn , 因为是切换了消费者的线程,因此内部实现用队列存储事件。在 Android 中默认的 buffersize
大小是16,因此当消费比生产慢时, 队列中的数目积累到超过16个,就会抛出 MissingBackpressureException , 初学者很难明白为什么会这样,使得学习曲线异常得陡峭。

而在 2.0 中,Observable 不再支持背压,而Flowable 支持非阻塞式的背压。并且规范要求,所有的操作符强制支持背压。幸运的是, Flowable 中的操作符大多与旧有的 Observable 类似。

Single、Completable

Single 与 Completable 都基于新的 Reactive Streams 的思想重新设计了接口,主要是消费者的接口, 现在他们是这样的:

interface SingleObserver<T> {
    void onSubscribe(Disposable d);
    void onSuccess(T value);
    void onError(Throwable error);
}

interface CompletableObserver<T> {
    void onSubscribe(Disposable d);
    void onComplete();
    void onError(Throwable error);
}

Subscriber

对比一下 Subscriber :

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

我们会发现和以前不一样的是多了一个 onSubscribe 的方法, Subscription 如下:

Subscription

public interface Subscription {
    public void request(long n);
    public void cancel();
}

熟悉 RxJava 1.x 的朋友能发现, 新的 Subscription 更像是综合了旧的 Producer与 Subscription 的综合体。他既可以向上游请求数据,又可以打断并释放资源。而旧的 Subscription 在这里因为名字被占,而被重新命名成了 Disposable

Disposable

public interface Disposable {
    void dispose();
    boolean isDisposed();
}

这里最大的不同就是这个 onSubscribe ,根据 Specification, 这个函数一定是第一个被调用的, 然后就会传给调用方一个 Subscription ,通过这种方式组织新的背压关系。当我们消费数据时,可以通过 Subscription 对象,自己决定请求数据。

这里就可以解释上面的非阻塞的背压。旧的阻塞式的背压,就是根据下游的消费速度,中游可以选择阻塞住等待下游的消费,随后向上游请求数据。而新的非阻塞就不在有中间阻塞的过程,由下游自己决定取多少,还有背压策略,如抛弃最新、抛弃最旧、缓存、抛异常等。

而新的接口带来的新的调用方式与旧的也不太一样, subscribe 后不再会有 Subscription 也就是如今的 Disposable,为了保持向后的兼容,
Flowable 提供了 subscribeWith方法 返回当前的 Subscriber 对象,
并且同时提供了 DefaultSubscriber , ResourceSubscriber , DisposableSubscriber ,让他们提供了 Disposable 接口,
可以完成和以前类似的代码:

ResourceSubscriber<Integer> subscriber = new ResourceSubscriber<Integer>() {
    @Override
    public void onStart() {
        request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(Integer t) {
        System.out.println(t);
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("Done");
    }
};

Flowable.range(1, 10).delay(1, TimeUnit.SECONDS).subscribe(subscriber);

subscriber.dispose();

收回 create 方法权限

在RxJava 1.x 最明显的问题就是由于 create 的太过开放,导致其被开发者滥用,而不是学习使用提供的操作符。

并且用户对 RxJava 不够了解,导致各种各样的问题,如背压、异常处理等。

由于规范要求所有的操作符强制支持背压,因此新的 create 采用了保守的设计,让用户实现 FlowableOnSubscribe 接口,并选取背压策略,然后在内部实现封装支持背压,简单的例子如下:

Flowable.create((FlowableEmitter<Integer> emitter) -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onComplete();
}, BackpressureStrategy.BUFFER);

Functions可以抛出异常

新的 ActionX 、 FunctionX 的方法声明都增加了一个 throws
Exception
 ,这带来了显而易见的好处,现在我们可以这样写:

Flowable.just("file.txt")
.map(name -> Files.readLines(name))
.subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);

而在以前是不行的, 因为 Files.readLines(name) 会显式的抛出一个 IOException。这样对
lambda 更加友好,而不必再去 try catch 。

Scheduler可以直接schedule

在以前是必须要先 createWorker ,用 Worker 对象去 shedule, 现在可以直接在 Scheduler 用这些方法:

public abstract class Scheduler {

    public Disposable scheduleDirect(Runnable task) { ... }

    public Disposable scheduleDirect(Runnable task, long delay, TimeUnit unit) { ... }

    public Disposable scheduleDirectPeriodically(Runnable task, long initialDelay,
        long period, TimeUnit unit) { ... }

    public long now(TimeUnit unit) { ... }

    // ... rest is the same: lifecycle methods, worker creation
}

这算是一个小优化,方便开发者使用。

Observable的一些继承并入了Flowable中

如 ConnectableObservable 、 BlockObservable 等,这样可以直接在 Flowable 中写出这样的代码:

List<Integer> list = Flowable.range(1, 100).toList().blockingFirst();

其他修改

还有一些普通开发者不太在意的修改:

  • hook方式变化,现在可以通过提供接口在 runtime hook
  • 部分在 1.x 中 被标记 @Beta 、 @Experimental 的操作符现在合并到正式版里了
  • 由于类结构的变动,一些类名的变化

等其他变动。

结语

RxJava 作为开源的经典之作,笔者一直都有所关注。后续笔者会继续为大家带来 RxJava 的源码解析与进阶使用系列等。感谢大家的阅读,如有不知之处,欢迎讨论交流。

时间: 2024-09-16 23:01:11

浅谈RxJava与2.0的新特性的相关文章

浅谈 RxJava 与 2.0 的新特性

简介 说起 RxJava ,相信诸多 Android 开发者都不会陌生.作为一个知名的响应式编程库,从前年开始逐渐变得火热,从小众到被众多 Android 开发者们广泛引入与流传,其在 GitHub 的仓库截止笔者写这篇文章时,已经有16400+个 star .甚至有一些大牛专门为 Android 写了 RxJava 的适配库,如 RxAndroid RxBinding RxLifecycle 为什么 RxJava 如此受到 Android 开发者们的欢迎.我想不外乎两个原因. 1. 异步 2.

浅谈Android Studio 3.0 工具新特性的使用 Android Profiler 、Device File Explorer

前言: 其实 studio3.0的工具大家也已经使用过一段时间了,自己呢,就是从bate版开始使用的,我觉得比较好用的几个地方.就几个,可能还没用到其他的精髓. 但我觉的这个两个功能对我是比较实用的.好那么下面就给大家介绍一下吧. 正文: 话不多说咱们直接上图吧.(个人比较喜欢看图说话) 第一个(Android Profiler)我要介绍的就是这个了.(先看一下效果"震撼一下") (图-1) (图-2) (图-3) (厉害不厉害,牛逼不牛逼)那么我们怎么来操作这个工具呢,来咱们接着看图

浅谈区域经理如何开发一个新市场?

新市场的开发一直以来都是企业经营管理的重要策略,新区域的开拓意味着企业经营区域的扩大,意味着在行业市场份额的提升,意味着企业知名度.影响力的扩大.但是新区域的开拓也伴随着各种风险和企业管理成本的增加,风险诸如文化的适应.人员的稳定性及养成.区域的经营模式.客户对价值的认可程度以及新区域服务的能力等,都是一个新区域成长所要面临的诸多问题,提前 认识到这些问题,会让我们在市场开发的过程中,更加合理的评估我们开拓的进程以及有意识的规避风险. 我们暂且不谈我们什么时候该进入一个新区域,而从一个新区域的开

在低版本的vc中使用vc 10.0的新特性

/*! 在低版本的vc中使用vc 10.0的新特性 created by : andrew.wu (erpingwu@gmail.com) */ vc 10.0提供了一些新的特性,最引人注目的莫过于lambda, 但vs2010 beta内存占用之多也不得不让人心生退让. 高手 7cat 指出 "vc ide 只是一个壳" 那么如果利用 vc 10.0 编译器的新功能? 以vc9为例, 注意 Tools->Projects and Solutions->VC++ Direc

Spring 2.0的新特性点评

Spring2.0的发布恐怕算得上2006年Java社区的一件大事了.在Spring2.0发布附带的文档里面对2.0新特性做了概要的介绍,2.0的新特性是自然是我们最关注的方面: 一.Spring的XML配置引入XML Schema语法简化配置 在Spring1.x系列中,bean的配置文件使用DTD,没有namespace的分隔.2.0的一个非常大的改进是引入了XML Schema的namespace,因而可以将bean的配置文件做大幅度的简化.这些简化包括了对bean属性的各种简化,AOP配

《Ext JS实战》——1.4 Ext JS 3.0的新特性

1.4 Ext JS 3.0的新特性 Ext JS 2.0中引入的一些变化是颠覆性的,这就导致从级到2.0相当困难.这主要是因为这一版引入了一个更加现代的布局管理器以及一个崭新的.健壮的组件层次,许多Ext JS 1.x的代码都会因此而崩溃.值得庆幸的是,由于Ext JS 2.0的良好的工艺设计,从Ext JS 2.0到3.0的移植就非常容易了.尽管Ext JS 3.0新增的内容并不怎么神奇,不过最新的版本还是可圈可点的,有些新增的特性还是值得讨论的. 1.4.1 Ext JS通过Direct完

[测验]C# 3.0新特性也已经基本定稿了,各种资料层出不穷,但大家对C# 2.0的新特性有多少了解呢?测验一下。

问题描述 入门题:以下哪个特性不是C#2.0的新特性:A.::命名空间别名限定符B.运算符重载C.空值类型D.匿名方法E.迭代器语法记忆题:以下哪个关键字不是C#2.0新增的:A.yeildB.globalC.fromD.fixedE.where提高题:以下关于匿名方法的说法,哪个是错的?A.匿名方法没有方法名B.匿名方法以委托的形式存在C.匿名方法方法参数类型是自动推断的D.匿名方法返回值类型是自动推断的E.匿名方法不能直接赋值给没有方法签名的Delegate类型对象.终极题:嗯嗯,既然是终极

C#7.0中新特性汇总_C#教程

以下将是 C# 7.0 中所有计划的语言特性的描述.随着 Visual Studio "15" Preview 4 版本的发布,这些特性中的大部分将活跃起来.现在是时候来展示这些特性,你也告诉借此告诉我们你的想法! C#7.0 增加了许多新功能,并专注于数据消费,简化代码和性能的改善.或许最大的特性就是元祖和模式匹配,元祖可以很容易地拥有多个返回结果,而模型匹配可以根据数据的"形"的不同来简化代码.我们希望,将它们结合起来,从而使你的代码更加简洁高效,也可以使你更加

Struts 2.0的新特性

Struts 2.0的新特性 Struts 2.0框架中出现的许多特性旨在让Struts更容易使用: ● 改进的设计: 与Struts 1相比,Struts 2的所有类都基于接口,核心接口独立于HTTP.这些API并不依赖服务器小程序API. ● 简化的Action: Struts 2 Action类独立于框架,是简化的普通Java对象(POJO).拥有execute()方法的任何Java类都可以用做Action类. ● POJO表单: Struts 2不支持ActionForms特性.Acti