RxJava中操作符到底做了什么?

RxJava今年彻底火了一把,其中最牛逼之处就是操作符了,以前只知道怎么用,这几天看了看源码,大致的弄清楚了操作符的工作过程,今天分享给大家。如果有什么不对地方,请大家多多指教。

今天我们已filter为例,看代码:


  1. Integer[] datas={1,2,3,4,5,6,7,8,9,10}; 
  2. Observable.from(datas) 
  3.         .filter(new Func1<Integer, Boolean>() { 
  4.             @Override 
  5.             public Boolean call(Integer integer) { 
  6.                 return integer>=5; 
  7.             } 
  8.         }) 
  9.         .subscribe(new Action1<Integer>() { 
  10.             @Override 
  11.             public void call(Integer integer) { 
  12.                 mText.append(integer.toString()+","); 
  13.             } 
  14.         }); 

一个很简单的小例子,用过滤操作符 filter 找出大于等于5的数字。我们点进去看看源码中filter做了什么


  1. public final Observable<T> filter(Func1<? super T, Boolean> predicate) {  
  2. return create(new OnSubscribeFilter<T>(this, predicate));  

调用了create()方法,等等我们什么时候是不是也用过create()
方法,我们在创建Observable时候也用过create()方法,原来创建了一个新的Observable返回出去了,那岂不是说我们的订阅者其实订阅的是这个新的Observable,我们继续往下看create方法,create方法需要的参数是一个OnSubscribe对象,那我们可以确定OnSubscribeFilter是OnSubscribe的一个实现类,我们点进去看看。


  1. public final class OnSubscribeFilter<T> implements OnSubscribe<T> { 
  2.     
  3.        final Observable<T> source; 
  4.     
  5.        final Func1<? super T, Boolean> predicate; 
  6.     
  7.        public OnSubscribeFilter(Observable<T> source, Func1<? super T, Boolean> predicate) { 
  8.            this.source = source; 
  9.            this.predicate = predicate; 
  10.        } 

果然不出我们所料,OnSubscribeFilter是OnSubscribe的实现类,我们看他的构造方法,传递了两个参数,第一个参数Observable对象,一个Func1,其中第一个参数就是我们我们自己创建的那个Observable,第二个参数使我们在外面写的Func1,然后保存了起来。我们都知道在subscribe()订阅的时候,OnSubscribe的call()方法。我们看看OnSubscribeFilter的call()方法都干了些什么


  1. @Override 
  2.         public void call(final Subscriber<? super T> child) { 
  3.             FilterSubscriber<T> parent = new FilterSubscriber<T>(child, predicate); 
  4.             child.add(parent); 
  5.             source.unsafeSubscribe(parent); 
  6.         } 

出现了一个FilterSubscriber,什么鬼玩意儿,我们看看他是什么鬼


  1.  
  2.       @Override 
  3.       public void onError(Throwable e) { 
  4.           if (done) { 
  5.               RxJavaHooks.onError(e); 
  6.               return; 
  7.           } 
  8.           done = true; 
  9.  
  10.           actual.onError(e); 
  11.       } 
  12.  
  13.  
  14.       @Override 
  15.       public void onCompleted() { 
  16.           if (done) { 
  17.               return; 
  18.           } 
  19.           actual.onCompleted(); 
  20.       } 
  21.       @Override 
  22.       public void setProducer(Producer p) { 
  23.           super.setProducer(p); 
  24.           actual.setProducer(p); 
  25.       } 
  26.   } 

一个Subscriber的子类,我们看他的构造方法,两个参数,一个Subscriber一个Func1,我们在创建对象时候Subscriber对象是我们真正的从外界传过来的观察者,Func1呢使我们创建OnSubscribeFilter时候传递进来的对象,也就是我们在外界定义的Func1。

回过头来我们继续看OnSubscribeFilter的call方法。我们看到source.unsafeSubscribe(parent),source是我们原来外界的Observable,他订阅了FilterSubscriber对象。我们在他的onNext方法中看到他根据func1.call(t)的返回值来判断是否让我们外界的真正的观察者调用onNext方法。

看到这里有没有恍然大悟,啥?我都不知道你在说啥,额,那我们整体的屡屡。

我们外界的代码,在subscribe()时候,Subscriber并不是订阅了我们自己写的Observable,Subscriber订阅的是filter方法返回的那个新的Observable对象,所以订阅时候会调用OnSubscribeFilter的call方法,OnSubscribeFilter才是我们订阅的被观察者的onSubscribe对象,在OnSubscribeFilter的call()方法中,我们让我们包装的FilterSubscriber订阅我们原来的被观察者,也就是我们在外界生成的那个Observable。我们在外界的Observable的onSubscribe对象的call方法中得到的观察者是FilterSubscriber对象,我们调用的onNext会回调到FilterSubscriber的onNext方法中。在FilterSubscriber的onNext方法中我们根据我们传递的Func1来判断是否要回调真正的Subscriber的onNext方法,在为true的时候我们才回调我们外界的观察者的onNext方法,也就起到了过滤的作用。这就是Filter的整个的流程。

我们来测试下我们的小结论:


  1. Observable.create(new Observable.OnSubscribe<Integer>() { 
  2.                @Override 
  3.                public void call(Subscriber<? super Integer> subscriber) { 
  4.                    Log.e("call:subscriber", "" + subscriber.getClass().getCanonicalName()); 
  5.                    subscriber.onNext(5); 
  6.                } 
  7.            }).filter(new Func1<Integer, Boolean>() { 
  8.                @Override 
  9.                public Boolean call(Integer integer) { 
  10.                    return integer > 0; 
  11.                } 
  12.            }).subscribe(new Action1<Integer>() { 
  13.                @Override 
  14.                public void call(Integer integer) { 
  15.                     
  16.                } 
  17.            }); 

作者:我是程序猿

来源:51CTO

时间: 2024-12-09 20:22:31

RxJava中操作符到底做了什么?的相关文章

Java的RxJava库操作符的用法及实例讲解_java

操作符就是为了解决对Observable对象的变换的问题,操作符用于在Observable和最终的Subscriber之间修改Observable发出的事件.RxJava提供了很多很有用的操作符. 比如map操作符,就是用来把把一个事件转换为另一个事件的. Observable.just("Hello, world!") .map(new Func1<String, String>() { @Override public String call(String s) { r

UI到底做什么

  UI到底做什么,要从UI接到的工作流程来说--产品/交互/设计.也就是说一般我们接到的是交互稿. 这时候UI看到一个交互设计,需要考虑的是: 1.这样的排布是否合理(比如960的屏和1136的屏幕是否都可以容下足够的设计关键信息) 2.信息重点在哪里? 3.用户人群是哪一类? 等等分析都是自己的事.另外, 通常我们拿到的交互稿是这样子的: 产出却要求是这样子的: 那么这中间的坑和鸿沟就是我们自己填. 在解决完上面的问题后,我们开始着手UI设计: 1.从产品需求入手,考虑我们到底要用什么主色调

聊聊UI DESIGN到底做什么的

  新人初涉UI领域,除了掌握必要的技能点,想尽快上手,了解整个设计流程也很重要,今天血儿同学聊聊自己接单的7个步骤,以供参考 >>> 血儿:UI到底做什么,要从UI接到的工作流程来说--产品/交互/设计.也就是说一般我们接到的是交互稿. 这时候UI看到一个交互设计,需要考虑的是: 1.这样的排布是否合理(比如960的屏和1136的屏幕是否都可以容下足够的设计关键信息) 2.信息重点在哪里? 3.用户人群是哪一类? 等等分析都是自己的事.另外,通常我们拿到的交互稿是这样子的: 产出却要求

qt c++-Qt中,到底如何实现主窗口和子窗口之间的通信?

问题描述 Qt中,到底如何实现主窗口和子窗口之间的通信? RT,比如,当子窗口关闭时,重新打开主窗口.这个是如何通信的,希望能给上例子.我知道是用信号和槽,可是两个窗口(类)之间的信号和槽我还不太会.网上也没找到具体的例子.希望大婶们能给个好点的直观的例子. 解决方案 主窗体类为A,子窗体类为B,在A中实例化B,其对象为b,关闭b,但不要释放b,调用b的public方法返回数据.仔细理解. void A::buttonClick(){ B b; b.exec(); b.getData();} 或

RxJava 中的多线程

本文讲的是RxJava 中的多线程, 原文地址:Multithreading with RxJava 原文作者:Pierce Zaifman 译文出自:掘金翻译计划 译者:PhxNirvana 校对者:yazhi1992.stormrabbit RxJava 中的多线程 大多数情况下,我写的 Android 代码都是可以流畅运行的.直到上几周编写一个需要读取和分析大型文件的 app 之前,我从未关心过 app 运行速度的问题. 尽管我期望用户明白文件越大,耗时越长的道理,有时候他们仍会放弃我的应

RxJava 中的 Subscriptions 是怎样泄露内存的

本文讲的是RxJava 中的 Subscriptions 是怎样泄露内存的, 关于 RxJava 已经有了很多很好的的教程文章.在使用 Android 框架时,它确实显著地简化了工作.然而需要注意,这种简化有它自己的缺陷.在接下来的部分中,你将探索其中的一个,从而了解 RxJava 的 Subscriptions 有多容易造成内存泄漏. 解决简单任务 假设你的主管让你实现一个显示随机的电影名的控件.它必须基于一些外部的推荐服务.这个控件应当根据用户要求显示电影名称.如果用户没有要求,它也可以自己

玖富新的转型到底做什么?以哪个业务作为切入点?

摘要: 成立7年后,玖富时代投资顾问有限公司(玖富),一家以零售金融业务咨询和小微金融管理技术为主营业务的金融顾问机构,试图敲开资本市场的大门. 玖富创始人.总裁孙雷曾供职于香 成立7年后,玖富时代投资顾问有限公司("玖富"),一家以零售金融业务咨询和小微金融管理技术为主营业务的金融顾问机构,试图敲开资本市场的大门. 玖富创始人.总裁孙雷曾供职于香港上市企业高阳科技,并在两年内成为高阳科技金融部总经理.其后,不愿成为高级打工者的孙雷又更换了两次工作,直至自己创业.而孙雷创业的切入点则选

Twitter到底做错了什么,怎么就混到了这样尴尬的境地?

虽然对于很多人而言,这家公司只是一个神秘的404,但对于繁荣昌盛的互联网来说,Twitter却是一个能够代表新媒体.社交平台和移动互联网的成功先驱,曾经一度和Facebook并列社交平台双雄.不过在Facebook.Snapchat等社交平台的用户都蹭蹭往上长时,Twitter的用户增长却停滞不前,营收也一直是亏损状态,因为业绩表现以及未来展望都很难达到市场预期,致使Twitter股价持续下跌,有分析师甚至下了Twitter明年就会被收购的断言,真是一个大写的惨. Twitter到底做错了什么,

联想电视到底做何打算?

摘要: 登顶PC市场的联想集团正在遭遇一场考验,这家靠硬件起家,靠渠道崛起,靠PC成名的传统厂商正在努力补齐其四屏一云的短板:占领家庭中最大的屏幕-电视.面对传统家电厂商的围剿 登顶PC市场的联想集团正在遭遇一场考验,这家靠硬件起家,靠渠道崛起,靠PC成名的传统厂商正在努力补齐其四屏一云的短板:占领家庭中最大的屏幕-电视.面对传统家电厂商的围剿,以及新型互联网公司跨界做电视的冲击,联想电视到底做何打算? 联想集团中国区智能电视事业部总经理任中伟和搜狐家电进行了一场对话,来阐述他眼中的智能电视前景