RxJava 和 RxAndroid 五(线程调度)

对rxJava不了解的同学可以先看

RxJava 和 RxAndroid 一 (基础)
RxJava 和 RxAndroid 二(操作符的使用)
RxJava 和 RxAndroid 三(生命周期控制和内存优化)

RxJava 和 RxAndroid 四(RxBinding的使用)

 

本文将有几个例子说明,rxjava线程调度的正确使用姿势。

例1

 Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })
                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

  结果

/rx_call: main           -- 主线程
/rx_map: main        --  主线程
/rx_subscribe: main   -- 主线程

例2

   new Thread(new Runnable() {
            @Override
            public void run() {
                Logger.v( "rx_newThread" , Thread.currentThread().getName()  );
                rx();
            }
        }).start();

 void rx(){
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })
                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

    }

 

      结果

/rx_newThread: Thread-564   -- 子线程
/rx_call: Thread-564              -- 子线程
/rx_map: Thread-564            -- 子线程 
/rx_subscribe: Thread-564    -- 子线程

 

  • 通过例1和例2,说明,Rxjava默认运行在当前线程中。如果当前线程是子线程,则rxjava运行在子线程;同样,当前线程是主线程,则rxjava运行在主线程

 

例3

 Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })

                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())

                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

  结果

/rx_call: RxCachedThreadScheduler-1    --io线程
/rx_map: main                                     --主线程
/rx_subscribe: main                              --主线程

 

例4

 Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })
                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })

                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())

                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ; 

      结果

/rx_call: RxCachedThreadScheduler-1     --io线程
/rx_map: RxCachedThreadScheduler-1   --io线程
/rx_subscribe: main                              --主线程

   

  • 通过例3、例4 可以看出  .subscribeOn(Schedulers.io())  和 .observeOn(AndroidSchedulers.mainThread()) 写的位置不一样,造成的结果也不一样。从例4中可以看出 map() 操作符默认运行在事件产生的线程之中。事件消费只是在 subscribe() 里面。
  • 对于 create() , just() , from()   等                 --- 事件产生   

               map() , flapMap() , scan() , filter()  等    --  事件加工

              subscribe()                                          --  事件消费

  •   事件产生:默认运行在当前线程,可以由 subscribeOn()  自定义线程

         事件加工:默认跟事件产生的线程保持一致, 可以由 observeOn() 自定义线程

       事件消费:默认运行在当前线程,可以有observeOn() 自定义

 

例5  多次切换线程

 

Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );

                        subscriber.onNext( "dd");
                        subscriber.onCompleted();
                    }
                })

                .observeOn( Schedulers.newThread() )    //新线程

                .map(new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88";
                    }
                })

                .observeOn( Schedulers.io() )      //io线程

                .filter(new Func1<String, Boolean>() {
                    @Override
                    public Boolean call(String s) {
                        Logger.v( "rx_filter" , Thread.currentThread().getName()  );
                        return s != null ;
                    }
                })

                .subscribeOn(Schedulers.io())     //定义事件产生线程:io线程
                .observeOn(AndroidSchedulers.mainThread())     //事件消费线程:主线程

                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

  结果

/rx_call: RxCachedThreadScheduler-1           -- io 线程
/rx_map: RxNewThreadScheduler-1             -- new出来的线程
/rx_filter: RxCachedThreadScheduler-2        -- io线程
/rx_subscribe: main                                   -- 主线程

 

例6:只规定了事件产生的线程

       Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.v( "rx--create " , Thread.currentThread().getName() ) ;
                        subscriber.onNext( "dd" ) ;
                    }
                })
                .subscribeOn(Schedulers.io())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.v( "rx--subscribe " , Thread.currentThread().getName() ) ;
                    }
                }) ;

  结果

/rx--create: RxCachedThreadScheduler-4                      // io 线程
/rx--subscribe: RxCachedThreadScheduler-4                 // io 线程

     

例:7:只规定事件消费线程

 Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.v( "rx--create " , Thread.currentThread().getName() ) ;
                        subscriber.onNext( "dd" ) ;
                    }
                })
                .observeOn( Schedulers.newThread() )
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.v( "rx--subscribe " , Thread.currentThread().getName() ) ;
                    }
                }) ;

  结果

/rx--create: main                                           -- 主线程
/rx--subscribe: RxNewThreadScheduler-1        --  new 出来的子线程 

      

    从例6可以看出,如果只规定了事件产生的线程,那么事件消费线程将跟随事件产生线程。

    从例7可以看出,如果只规定了事件消费的线程,那么事件产生的线程和 当前线程保持一致。

 

例8:线程调度封装

 在Android 常常有这样的场景,后台处理处理数据,前台展示数据。

一般的用法:

   Observable
                .just( "123" )
                .subscribeOn( Schedulers.io())
                .observeOn( AndroidSchedulers.mainThread() )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                    }
                }) ;

  但是项目中这种场景有很多,所以我们就想能不能把这种场景的调度方式封装起来,方便调用。

简单的封装

    public Observable apply( Observable observable ){
       return observable.subscribeOn( Schedulers.io() )
                .observeOn( AndroidSchedulers.mainThread() ) ;
    }

使用

  apply( Observable.just( "123" ) )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {

                    }
                }) ;

弊端:虽然上面的这种封装可以做到线程调度的目的,但是它破坏了链式编程的结构,是编程风格变得不优雅。

改进:Transformers 的使用(就是转化器的意思,把一种类型的Observable转换成另一种类型的Observable )

改进后的封装

    Observable.Transformer schedulersTransformer = new  Observable.Transformer() {
        @Override public Object call(Object observable) {
            return ((Observable)  observable).subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };

  使用

      Observable
                .just( "123" )
                .compose( schedulersTransformer )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                    }
                }) ;

  弊端:虽然保持了链式编程结构的完整,但是每次调用 .compose( schedulersTransformer ) 都是 new 了一个对象的。所以我们需要再次封装,尽量保证单例的模式。

改进后的封装

package lib.app.com.myapplication;

import rx.Observable;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;

/**
 * Created by ${zyj} on 2016/7/1.
 */
public class RxUtil {

    private final static Observable.Transformer schedulersTransformer = new  Observable.Transformer() {
        @Override public Object call(Object observable) {
            return ((Observable)  observable).subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };

   public static  <T> Observable.Transformer<T, T> applySchedulers() {
        return (Observable.Transformer<T, T>) schedulersTransformer;
    }

}

  使用

    Observable
                .just( "123" )
                .compose( RxUtil.<String>applySchedulers() )
                .subscribe(new Action1() {
                    @Override
                    public void call(Object o) {
                    }
                }) ;

  

 

 

 

 

时间: 2024-10-03 03:43:42

RxJava 和 RxAndroid 五(线程调度)的相关文章

RxJava 和 RxAndroid 三(生命周期控制和内存优化)

rxjava rxandroid 赵彦军 前言:对Rxjava.Rxandroid不了解的同学可以先看看RxJava 和 RxAndroidRxJava 和 RxAndroid 二(操作符的使用) RxJava使我们很方便的使用链式编程,代码看起来既简洁又优雅.但是RxJava使用起来也是有副作用的,使用越来越多的订阅,内存开销也会变得很大,稍不留神就会出现内存溢出的情况,这篇文章就是介绍Rxjava使用过程中应该注意的事项. 1.取消订阅 subscription.unsubscribe()

RxJava 和 RxAndroid 二(操作符的使用)

前言:对Rx不了解的朋友可以先看我的第一篇博文 RxJava 和 RxAndroid 一 (基础),是对Rxjava的基本介绍   1.merge操作符,合并观察对象 19 List<String> list1 = new ArrayList<>() ; 20 List<String> list2 = new ArrayList<>() ; 21 22 list1.add( "1" ) ; 23 list1.add( "2&qu

RxJava 和 RxAndroid 一 (基础)

1.RxJava 项目地址    https://github.com/ReactiveX/RxJava   2.RxAndroid 项目地址    https://github.com/ReactiveX/RxAndroid   3.RxJava 和 RxAndroid 的关系      RxAndroid是RxJava的一个针对Android平台的扩展,主要用于 Android 开发   4.RxJava和EventBus的区别?      https://www.zhihu.com/que

[Android]基于RxJava、RxAndroid的EventBus实现

以下内容为原创,欢迎转载,转载请注明 来自天天博客:http://www.cnblogs.com/tiantianbyconan/p/4578699.html    Github:https://github.com/wangjiegulu/RxAndroidEventsSample EventBus的作用是发布/订阅事件总线,因为项目中用到RxJava.RxAndroid,所以完全可以使用RxJava.RxAndroid来实现EventBus.   1. 编写RxBus,用于存储所有事件Sub

RxJava 2.x 使用最佳实践

转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/76443347 本文出自[赵彦军的博客] 以前写过 Rxjava 系列教程, 如下所示 RxJava 和 RxAndroid 一 (基础) RxJava 和 RxAndroid 二(操作符的使用) RxJava 和 RxAndroid 三(生命周期控制和内存优化) RxJava 和 RxAndroid 四(RxBinding的使用) RxJava 和 RxAndroid 五(线程调

【Android】RxJava之初始篇

关于RxJava RxJava是ReactiveX推出在Java VM环境下使用的异步操作库.除了在Java环境ReactiveX也为其他编程语言推出Rx库,例如Py.Js.Go等.网上有很多关于对RxJava的介绍和使用,在Android开发中也有很多项目使用RxJava.那为什么还要使用RxJava呢,Android开发也有提供异步操作的方法供开发者使用,我想应该是RxJava比起Handle.AsyncTask简洁优雅. 1 RxJava采用链式调用,在程序逻辑上清晰简洁 2 采用扩展式观

Android开发软件架构思考以及经验总结

一.萌芽 作为一只编程经验并不怎么丰富的程序猿来讲,我一直觉得架构师是一个比较神秘的职业,架构设计就更加的高大上了.经过今年的几个项目,之前曾发文叙述我的从MVC到MVP项目重构实战经验,也曾说过我准备对目前手底下的项目进行重构.但是,前段时间,我改变了我的想法.开发模式的重构,仅仅只是换了一个套路,也许在重构的过程中对业务的逻辑进行了一次梳理,也是在基于前人的代码设计上进行了一些优化.但是,这远远还不够,这不是我理想中的开发场景.在项目开发的过程中,也发现存在许多的问题,但是都是一些零散的问题

网络请求框架 Retrofit 2 使用入门

本文讲的是网络请求框架 Retrofit 2 使用入门, 你将要创造什么 Retrofit 是什么? Retrofit 是一个用于 Android 和 Java 平台的类型安全的网络请求框架.Retrofit 通过将 API 抽象成 Java 接口而让我们连接到 REST web 服务变得很轻松.在这个教程里,我会向你介绍如何使用这个 Android 上最受欢迎和经常推荐的网络请求库之一. 这个强大的库可以很简单的把返回的 JSON 或者 XML 数据解析成简单 Java 对象(POJO).GE

android 快速开发框架,基于组件化的MVP结构

AndroidMouldProject2 android快速开发模板工程,组件化的MVP结构,方便快速开发和多人协作,减少代码耦合,同时方便自定义扩展,封装了一些常用 的模块,通过引用AndroidBaseModule基础工程的方式来实现持续更新,同时通过依赖库的方式,增强个性化定制,使工程更加 简洁,使开发者只需要关心自己的逻辑实现,便于维护. 项目结构: 使用方法: 复制代码,修改包名,按照自己的需求更改代码逻辑和依赖关系 新建activity,fragment,model,view,pre