调度器 Scheduler

调度器 Scheduler

如果你想给Observable操作符链添加多线程功能,你可以指定操作符(或者特定的Observable)在特定的调度器(Scheduler)上执行。

某些ReactiveX的Observable操作符有一些变体,它们可以接受一个Scheduler参数。这个参数指定操作符将它们的部分或全部任务放在一个特定的调度器上执行。

使用ObserveOn和SubscribeOn操作符,你可以让Observable在一个特定的调度器上执行,ObserveOn指示一个Observable在一个特定的调度器上调用观察者的onNext, onError和onCompleted方法,SubscribeOn更进一步,它指示Observable将全部的处理过程(包括发射数据和通知)放在特定的调度器上执行。

RxJava示例

调度器的种类

下表展示了RxJava中可用的调度器种类:

调度器类型 效果
Schedulers.computation( ) 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.from(executor) 使用指定的Executor作为调度器
Schedulers.immediate( ) 在当前线程立即开始执行任务
Schedulers.io( ) 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.newThread( ) 为每个任务创建一个新线程
Schedulers.trampoline( ) 当其它排队的任务完成后,在当前线程排队开始执行

默认调度器

在RxJava中,某些Observable操作符的变体允许你设置用于操作执行的调度器,其它的则不在任何特定的调度器上执行,或者在一个指定的默认调度器上执行。下面的表格个列出了一些操作符的默认调度器:

操作符 调度器
buffer(timespan) computation
buffer(timespan, count) computation
buffer(timespan, timeshift) computation
debounce(timeout, unit) computation
delay(delay, unit) computation
delaySubscription(delay, unit) computation
interval computation
repeat trampoline
replay(time, unit) computation
replay(buffersize, time, unit) computation
replay(selector, time, unit) computation
replay(selector, buffersize, time, unit) computation
retry trampoline
sample(period, unit) computation
skip(time, unit) computation
skipLast(time, unit) computation
take(time, unit) computation
takeLast(time, unit) computation
takeLast(count, time, unit) computation
takeLastBuffer(time, unit) computation
takeLastBuffer(count, time, unit) computation
throttleFirst computation
throttleLast computation
throttleWithTimeout computation
timeInterval immediate
timeout(timeoutSelector) immediate
timeout(firstTimeoutSelector, timeoutSelector) immediate
timeout(timeoutSelector, other) immediate
timeout(timeout, timeUnit) computation
timeout(firstTimeoutSelector, timeoutSelector, other) immediate
timeout(timeout, timeUnit, other) computation
timer computation
timestamp immediate
window(timespan) computation
window(timespan, count) computation
window(timespan, timeshift) computation

使用调度器

除了将这些调度器传递给RxJava的Observable操作符,你也可以用它们调度你自己的任务。下面的示例展示了Scheduler.Worker的用法:


worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
    }

});
// some time later...
worker.unsubscribe();

递归调度器

要调度递归的方法调用,你可以使用schedule,然后再用schedule(this),示例:


worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
        // recurse until unsubscribed (schedule will do nothing if unsubscribed)
        worker.schedule(this);
    }

});
// some time later...
worker.unsubscribe();

检查或设置取消订阅状态

Worker类的对象实现了Subscription接口,使用它的isUnsubscribed和unsubscribe方法,所以你可以在订阅取消时停止任务,或者从正在调度的任务内部取消订阅,示例:


Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {

    @Override
    public void call() {
        while(!worker.isUnsubscribed()) {
            status = yourWork();
            if(QUIT == status) { worker.unsubscribe(); }
        }
    }

});

Worker同时是Subscription,因此你可以(通常也应该)调用它的unsubscribe方法通知可以挂起任务和释放资源了。

延时和周期调度器

你可以使用schedule(action,delayTime,timeUnit)在指定的调度器上延时执行你的任务,下面例子中的任务将在500毫秒之后开始执行:

someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);

使用另一个版本的schedule,schedulePeriodically(action,initialDelay,period,timeUnit)方法让你可以安排一个定期执行的任务,下面例子的任务将在500毫秒之后执行,然后每250毫秒执行一次:

someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);

测试调度器

TestScheduler让你可以对调度器的时钟表现进行手动微调。这对依赖精确时间安排的任务的测试很有用处。这个调度器有三个额外的方法:

  • advanceTimeTo(time,unit) 向前波动调度器的时钟到一个指定的时间点
  • advanceTimeBy(time,unit) 将调度器的时钟向前拨动一个指定的时间段
  • triggerActions( ) 开始执行任何计划中的但是未启动的任务,如果它们的计划时间等于或者早于调度器时钟的当前时间
时间: 2024-08-30 20:25:58

调度器 Scheduler的相关文章

Yarn 调度器Scheduler详解

理想情况下,我们应用对Yarn资源的请求应该立刻得到满足,但现实情况资源往往是有限的,特别是在一个很繁忙的集群,一个应用资源的请求经常需要等待一段时间才能的到相应的资源.在Yarn中,负责给应用分配资源的就是Scheduler.其实调度本身就是一个难题,很难找到一个完美的策略可以解决所有的应用场景.为此,Yarn提供了多种调度器和可配置的策略供我们选择. 一.调度器的选择 在Yarn中有三种调度器可以选择:FIFO Scheduler ,Capacity Scheduler,FairS ched

Go调度器: M,P和G

这是另一篇关于Go调度器(scheduler)的文章. 原文: GO SCHEDULER: MS, PS & GS by Uber工程师 Povilas. 网上已经有很多关于Go调度器的文章了, 比如 Golang调度器源码分析 ,多看一些,可以加深记忆,也可以对比查看文章中是否有不准确的地方,更全面的了解Go的调度器. 我决定深入了解Go的内部机制, 因为很长时间没人写关于Go scheduler的文章了, 我觉得这是一个很有趣的知识点,所以让我们开始吧. 基础知识 Go的运行时管理着调度.垃

Event Scheduler:MySQL的事件调度器

一.概述 事件调度器是在 MySQL 5.1 中新增的另一个特色功能,可以作为定时任务调度器,取代部分原先只能 用操作系统任务调度器才能完成的定时功>能.例如,Linux 中的 crontabe 只能精确到每分钟执行一 次,而 MySQL 的事件调度器则可以实现每秒钟执行一个任务,这在一些对实时性要>求较高的环境下 就非常实用了. 事件调度器是定时触发执行的,在这个角度上也可以称作是"临时的触发器".触发器只是 针对某个表产生的事件执行一些语句,而事件调度器则是在某一个(间

详解MySQL用事件调度器Event Scheduler创建定时任务_Mysql

前言 事件调度器相当于操作系统中的定时任务(如:Linux中的cron.Window中的计划任务),但MySql的事件调度器可以精确到秒,对于一些实时性要求较高的数据处理非常有用. 1. 创建/修改事件(EVENT) 在MySql中,创建一个新的调度器使用CREATE EVENT,其语法规则如下: CREATE [DEFINER = { user | CURRENT_USER }] EVENT [IF NOT EXISTS] event_name ON SCHEDULE schedule [ON

MySQL Event Scheduler(事件调度器)_Mysql

一.概述 事件调度器是在 MySQL 5.1 中新增的另一个特色功能,可以作为定时任务调度器,取代部分原先只能用操作系统任务调度器才能完成的定时功>能.例如,Linux 中的 crontabe 只能精确到每分钟执行一次,而 MySQL 的事件调度器则可以实现每秒钟执行一个任务,这在一些对实时性要>求较高的环境下就非常实用了. 事件调度器是定时触发执行的,在这个角度上也可以称作是"临时的触发器".触发器只是针对某个表产生的事件执行一些语句,而事件调度器则是在某一个(间隔)时间

【MySQL】事件调度器 (Event Scheduler)

 一 event 介绍         事件调度器是定时触发执行的,在这个角度上也可以称作是"临时的触发器".触发器只是针对某个表产生的事件执行一些语句,而事件调度器则是在某一个(间隔)时间执行一些语句.事件是由一个特定的线程来管理的,也就是所谓的"事件调度器".启用事件调度器后,拥有 SUPER 权限的账户执行 SHOW PROCESSLIST 就可以看到这个线程了.通过设定全局变量event_scheduler 的值即可动态的控制事件调度器是否启用. 在使用这个

MySQL计划任务(事件调度器) Event Scheduler介绍_Mysql

要查看当前是否已开启事件调度器,可执行如下SQL: SHOW VARIABLES LIKE 'event_scheduler';或 SELECT @@event_scheduler;或 SHOW PROCESSLIST;若显示: +-----------------+-------+| Variable_name   | Value |+-----------------+-------+| event_scheduler | OFF   |+-----------------+-------+

在J2EE环境中使用Quartz企业级计划调度器(1)

Quartz 是来自OpenSymphony的一个开发源代码的企业级工作计划调度器.要了解详情以及下载Quartz请访问http://www.quartzscheduler.org/quartz/.你可以在你的J2EE应用如EJB中使用Quartz来调度工作计划.本文将会介绍在你的J2EE应用中如何使用Quartz来安排工作计划.文章中将会使用Oracle应用服务器10g J2EE容器(OC4J 9.0.4)作为J2EE容器的例子. Quartz 支持多种类型的工作和触发器,但其中最流行的就是c

Sparrow: 适用于细粒度tasks低延迟调度的去中心化无状态分布式调度器

背景介绍 Sparrow的论文收录在SOSP 2013,在网上还可以找到一份作者的talk ppt,值得一提的是作者是位PPMM.她之前发表过一篇The Case for Tiny Tasks in Compute Clusters,这篇文章我没有仔细看过,但是当时在看mesos粗细粒度模式的时候,组里有讨论过这篇论文.再结合她的github上的项目,发现她和AMP实验室里Mesos,spark等项目,在研究方向和合作上还是有很多渊源的.透过Sparrow,结合对Mesos.YARN的一些了解,