RxJava操作符系列二(下)

接上文

输出日志信息


  1. call:2 ConcatMap RxNewThreadScheduler-5 
  2. onNext: ConcatMap 101 ConcatMap 
  3. call:2 ConcatMap RxNewThreadScheduler-6 
  4. onNext: ConcatMap 102 ConcatMap 
  5. call:2 ConcatMap RxNewThreadScheduler-7 
  6. onNext: ConcatMap 103 ConcatMap 
  7. onCompleted: ConcatMap  

通过该操作符和flatMap输出的日志信息,很容易看出flatMap并没有保证数据源的顺序性,但是ConcatMap操作符保证了数据源的顺序性。在应用中,如果你对数据的顺序性有要求的话,就需要使用ConcatMap。若没有要求,二者皆可使用。

SwitchMap

当原始Observable发射一个新的数据(Observable)时,它将取消订阅并停止监视产生执之前那个数据的Observable,只监视当前这一个.


  1. Integer[] integers = {1, 2, 3}; 
  2. Observable.from(integers).switchMap(new Func1>() { 
  3.             @Override 
  4.             public Observable call(Integer integer) { 
  5.                 Log.e(TAG, "call: SwitchMap" + Thread.currentThread().getName()); 
  6.                 //如果不通过subscribeOn(Schedulers.newThread())在在子线程模拟并发操作,所有数据源依然会全部输出,也就是并发操作此操作符才有作用 
  7.                 //若在此通过Thread。sleep()设置等待时间,则输出信息会不一样。相当于模拟并发程度 
  8.                 return Observable.just((integer + 100) + "SwitchMap").subscribeOn(Schedulers.newThread()); 
  9.             } 
  10.         }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber() { 
  11.             @Override 
  12.             public void onCompleted() { 
  13.                 Log.e(TAG, "onCompleted: SwitchMap"); 
  14.             } 
  15.   
  16.             @Override 
  17.             public void onError(Throwable e) { 
  18.                 Log.e(TAG, "onError: SwitchMap"); 
  19.             } 
  20.   
  21.             @Override 
  22.             public void onNext(String s) { 
  23.                 Log.e(TAG, "onNext: SwitchMap "+s); 
  24.             } 
  25.         });  

输出日志信息


  1. call: SwitchMapmain 
  2. call: SwitchMapmain 
  3. call: SwitchMapmain 
  4. onNext: SwitchMap 106SwitchMap 
  5. onCompleted: SwitchMap  

当数据源较多时,并不一定是只输出最后一项数据,有可能输出几项数据,也可能是全部。

GroupBy

看到这个词你就应该想到了这个操作符的作用,就是你理解的含义,他将数据源按照你的约定进行分组。我们通过groupBy实行将1到10的数据进行就划分,代码如下


  1. Observable.range(1, 10).groupBy(new Func1() { 
  2.             @Override 
  3.             public Boolean call(Integer integer) { 
  4.                 return integer % 2 == 0; 
  5.             } 
  6.         }).subscribe(new Subscriber>() { 
  7.             @Override 
  8.             public void onCompleted() { 
  9.                 Log.e(TAG, "onCompleted:1 "); 
  10.             } 
  11.   
  12.             @Override 
  13.             public void onError(Throwable e) { 
  14.                 Log.e(TAG, "onError:1 "); 
  15.             } 
  16.   
  17.             @Override 
  18.             public void onNext(GroupedObservable booleanIntegerGroupedObservable) { 
  19.                 booleanIntegerGroupedObservable.toList().subscribe(new Subscriber>() { 
  20.                     @Override 
  21.                     public void onCompleted() { 
  22.                         Log.e(TAG, "onCompleted:2 " ); 
  23.                     } 
  24.   
  25.                     @Override 
  26.                     public void onError(Throwable e) { 
  27.                         Log.e(TAG, "onError:2 "); 
  28.                     } 
  29.   
  30.                     @Override 
  31.                     public void onNext(List integers) { 
  32.                         Log.e(TAG, "onNext:2 "+integers); 
  33.                     } 
  34.                 }); 
  35.             } 
  36.         });  

输出日志信息


  1. onNext:2 [1, 3, 5, 7, 9] 
  2. onCompleted:2 
  3. onNext:2 [2, 4, 6, 8, 10] 
  4. onCompleted:2 
  5. onCompleted:1  

在上面代码中booleanIntegerGroupedObservable变量有一个getKey()方法,该方法返回的是分组的key,他的值就是groupBy方法call回调所用函数的值,在上面也就是integer % 2 == 0的值,及true和false。有几个分组也是有此值决定的。

Scan

操作符对原始Observable发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。

例如计算1+2+3+4的和


  1. Observable.range(1,4).scan(new Func2() { 
  2.             @Override 
  3.             public Integer call(Integer integer, Integer integer2) { 
  4.                 Log.e(TAG, "call: integer:"+integer+"  integer2 "+integer2); 
  5.                 return integer+integer2; 
  6.             } 
  7.         }).subscribe(new Subscriber() { 
  8.             @Override 
  9.             public void onCompleted() { 
  10.                 Log.e(TAG, "onCompleted: "); 
  11.             } 
  12.   
  13.             @Override 
  14.             public void onError(Throwable e) { 
  15.                 Log.e(TAG, "onError: " ); 
  16.             } 
  17.   
  18.             @Override 
  19.             public void onNext(Integer integer) { 
  20.                 Log.e(TAG, "onNext: "+integer ); 
  21.             } 
  22.         });  

输出日志信息


  1. onNext: 1 
  2. call: integer:1  integer2 2 
  3. onNext: 3 
  4. call: integer:3  integer2 3 
  5. onNext: 6 
  6. call: integer:6  integer2 4 
  7. onNext: 10 
  8. onCompleted:  

对于scan有一个重载方法,可以设置一个初始值,如上面代码,初始值设置为10,只需将scan加个参数scan(10,new Func2)。

Buffer

操作符将一个Observable变换为另一个,原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合,如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据。

示例代码


  1. Observable.range(10, 6).buffer(2).subscribe(new Subscriber>() { 
  2.             @Override 
  3.             public void onCompleted() { 
  4.                 Log.e(TAG, "onCompleted: "); 
  5.             } 
  6.   
  7.             @Override 
  8.             public void onError(Throwable e) { 
  9.                 Log.e(TAG, "onError: "); 
  10.             } 
  11.   
  12.             @Override 
  13.             public void onNext(List integers) { 
  14.                 Log.e(TAG, "onNext: " + integers); 
  15.             } 
  16.         });  

输出日志信息


  1. onNext: [10, 11] 
  2. onNext: [12, 13] 
  3. onNext: [14, 15] 
  4. onCompleted:  

上面一次性订阅两个数据,如果设置参数为6,就一次性订阅。buffer的另一重载方法buffer(count, skip)从原始Observable的第一项数据开始创建新的缓存(长度count),此后每当收到skip项数据,用count项数据填充缓存:开头的一项和后续的count-1项,它以列表(List)的形式发射缓存,取决于count和skip的值,这些缓存可能会有重叠部分(比如skip count时)。具体执行结果,你可以设置不同的skip和count观察输出日志,查看执行结果及流程。

Window

Window和Buffer类似,但不是发射来自原始Observable的数据包,它发射的是Observables,这些Observables中的每一个都发射原始Observable数据的一个子集,最后发射一个onCompleted通知。


  1. Observable.range(10, 6).window(2).subscribe(new Subscriber>() { 
  2.             @Override 
  3.             public void onCompleted() { 
  4.                 Log.e(TAG, "onCompleted1: "); 
  5.             } 
  6.   
  7.             @Override 
  8.             public void onError(Throwable e) { 
  9.                 Log.e(TAG, "onError1: "); 
  10.             } 
  11.   
  12.             @Override 
  13.             public void onNext(Observable integerObservable) { 
  14.                 Log.e(TAG, "onNext1: "); 
  15.                 tv1.append("\n"); 
  16.                 integerObservable.subscribe(new Subscriber() { 
  17.                     @Override 
  18.                     public void onCompleted() { 
  19.                         Log.e(TAG, "onCompleted2: "); 
  20.                     } 
  21.   
  22.                     @Override 
  23.                     public void onError(Throwable e) { 
  24.                         Log.e(TAG, "onError2: "); 
  25.                     } 
  26.   
  27.                     @Override 
  28.                     public void onNext(Integer integer) { 
  29.                         Log.e(TAG, "onNext2: "+integer); 
  30.                     } 
  31.                 }); 
  32.             } 
  33.         });  

输出日志信息


  1. onNext2: 10 
  2. onNext2: 11 
  3. onCompleted2: 
  4. onNext2: 12 
  5. onNext2: 13 
  6. onCompleted2: 
  7. onNext2: 14 
  8. onNext2: 15 
  9. onCompleted2: 
  10. onCompleted1:  

window和buffer一样也有不同的重载方法。这两个操作符相对其他操作符不太容易理解,可以去RxJava GitHub理解,里面有图示解析。当然最好的理解方式就是通过更改变量的值,去观察输出的日志信息。

好了,这篇文章就介绍到这里。若文中有错误的地方,欢迎指正。谢谢。

本文作者:佚名

来源:51CTO

时间: 2024-11-02 01:16:18

RxJava操作符系列二(下)的相关文章

RxJava操作符系列二(上)

在上篇文章RxJava操作符系列一我们介绍的操作符几乎都是创建被观察者的操作符,那么今天的这篇文章就介绍一下经常用到的转换操作符.话不多说,开始上车. Map 该操作符是对原始Observable发射的每一项数据运用一个函数,然后返回一个发射这些结果的Observable. 例如我们有一个整形数组的数据,当大于5时输出为true,则代码实现 Integer[] integers = {0, 9, 6, 4, 8};          Observable.from(integers).map(n

RxJava操作符系列三(上)

RxJava操作符系列传送门 RxJava操作符源码 https://github.com/xiehui999/fuseProgram RxJava操作符系列一 RxJava操作符系列二 前言 在之前的文章,我们介绍了一些Observable的创建以及数据转换的操作符,其中的一些数据转换的操作符理解还是有一定的难度的,但是相信如果敲一遍代码并且修改各种参数的值,去观察执行的日志,相信还是很容易的理解的.在官网,每个操作符都给出了图例,如果你对文字的理解不够清楚明白,也可以去参考图示帮助自己理解.

RxJava操作符系列三(下)

接上文 Take Take操作符可以修改Observable的行为,只返回前面的N项数据,然后发射完成通知,忽略剩余的数据. Observable.range(1,8)              .take(4)              .subscribe(new Subscriber<Integer>() {            @Override            public void onNext(Integer item) {               Log.e(TAG

VSTO之旅系列(二):创建Excel解决方案

原文:VSTO之旅系列(二):创建Excel解决方案   本专题概要 引言 创建VSTO项目 Excel对象模型 创建Excel外接程序 创建Excel文档级自定义项 小结   一.引言 也许很多朋友都没有听说过VSTO这个东西的,本人之前也同样也不知道的,但是由于工作的原因接触了这方面,由于VSTO方面国内的资料比较少,本人刚开始学习的时候都是参考MSDN的,但是上面很多资料都是英文的,可能学习起来会比较慢点,所以本人把最近一段时间学习的内容记录下来,一来是作为一个巩固的学习笔记,二来希望这些

Android高效率编码-第三方SDK详解系列(二)——Bmob后端云开发,实现登录注册,更改资料,修改密码,邮箱验证,上传,下载,推送消息,缩略图加载等功能

Android高效率编码-第三方SDK详解系列(二)--Bmob后端云开发,实现登录注册,更改资料,修改密码,邮箱验证,上传,下载,推送消息,缩略图加载等功能 我的本意是第二篇写Mob的shareSDK分享组件的,奈何需要去注册各平台的账号,还要审核,有些审核还挺久,就没办法,改为写这个Bmob了,相信大家对Bmob都是挺期待的吧,因为他作为Android后端的实现很好的支持,国内很多软件都在使用它,他的功能也是特别神奇,这里就不一一细说了,我们用实际的例子来见证他的神奇 官网:http://w

系列二VS项目软件配置工具介绍

原文:系列二VS项目软件配置工具介绍 Svn和VisualSvn介绍      在使用TortoiseSvn(SVN客户端)+ AnkhSvn(VS2008插件) +VisualSvn Server(版本控制服务器)进行源代码版本控制前,有必要先了解下Subversion(Svn).     Svn(Subversion)是近年来崛起的版本管理工具,是CVS的接班人.目前,绝大多数开源软件都使用Svn作为代码版本管理软件.      Svn客户端: Subversion的客户端有两类,一类是we

Bootstrap &lt;基础十二&gt;下拉菜单(Dropdowns)

原文:Bootstrap <基础十二>下拉菜单(Dropdowns) Bootstrap 下拉菜单.下拉菜单是可切换的,是以列表格式显示链接的上下文菜单.这可以通过与 下拉菜单(Dropdown) JavaScript 插件 的互动来实现. 如需使用下列菜单,只需要在 class .dropdown 内加上下拉菜单即可.下面的实例演示了基本的下拉菜单: <!DOCTYPE html> <html> <head> <title>Bootstrap

Android Camera开发系列(下)——自定义Camera实现拍照查看图片等功能

Android Camera开发系列(下)--自定义Camera实现拍照查看图片等功能 Android Camera开发系列(上)--Camera的基本调用与实现拍照功能以及获取拍照图片加载大图片 上篇讲的都是一些基本的使用,这篇就来自己定义一个相机了 参照Google API:http://developer.android.com/guide/topics/media/camera.html 我们还是在原来的demo上修改,新增一个button,点击跳转到CameraActivity,我们在

服务器部署十大问题系列二:配置与升级

本系列文章介绍数据中心服务器部署前后整个过程需要考虑到的十个基础问题.第一部分<服务器部署十大问题系列一:软件与硬件>中,我们介绍了新增服务器对当前设施有哪些影响,以及软硬件方面需要考虑的问题. 本文为系列二,主要介绍在配置服务器时应该注意的问题,因为即使是对最能干和熟练的IT专业人士来说,手动配置也不仅耗时也是个易错的过程.另外,新的服务器需要有正确的打补丁和升级方式等等. 服务器配置有没有清晰的模板? 新的服务器需要通过安装软件.设置服务器角色.设定IP地址,通过域名系统和Active D