【Python】轻量级分布式任务调度系统-RQ

一 前言   
   Redis Queue 一款轻量级的P分布式异步任务队列,基于Redis作为broker,将任务存到redis里面,然后在后台执行指定的Job。就目前而言有三套成熟的工具celery,huey ,rq 。按照功能和使用复杂度来排序的话也是 celery>huey>rq. 因为rq 简单,容易上手,所以自己做的系统也会使用RQ作为分布式任务调度系统。
二 安装 
   因为RQ 依赖于Redis 故需要安装版本>= 2.6.0.具体安装方法请参考《Redis初探》。*nix 系统环境下安装RQ:

  1. pip install rq

无需其他配置即可以使用RQ。
三 原理
   RQ 主要由三部分构成 Job ,Queues,Worker 构成。job也就是开发定义的函数用来实现具体的功能。调用RQ 把job 放入队列Queues,Worker 负责从redis里面获取任务并执行,根据具体情况返回函数的结果。
3.1 关于job
   一个任务(job)就是一个Python对象,具体表现为在一个工作(后台)进程中异步调用一个函数。任何Python函数都可以异步调用,简单的将函数与参数追加到队列中,这叫做入队(enqueueing)。
3.2 关于Queue
   将任务加入到队列之前需要初始化一个连接到指定Redis的Queue

  1. q=Queue(connection=redis_conn)
  2. from rq_test import hello
  3. result = q.enqueue(hello,'yangyi')

   queue有如下属性:
   timeout :指定任务最长执行时间,超过该值则被认为job丢失,对于备份任务 需要设置一个比较长的时间 比如24h。
   result_ttl :存储任务返回值的有效时间,超过该值则失效。
   ttl :specifies the maximum queued time of the job before it'll be cancelled
   depends_on :specifies another job (or job id) that must complete before this job will be queued
   job_id : allows you to manually specify this job's job_id
   at_front :will place the job at the front of the queue, instead of the back
   kwargs and args : lets you bypass the auto-pop of these arguments, ie: specify a timeout argument for the underlying job function.
  需要关注的是 depends_on ,通过该属性可以做级联任务A-->B ,只有当A 执行成功之后才能执行B .
  通过指定队列的名字,我们可以把任务加到一个指定的队列中:

  1. q = Queue("low", connection = redis_conn)
  2. q.enqueue(hello, "杨一")

 对于例子中的Queue("low"),具体使用的时候可以替换"low"为任意的复合业务逻辑名字,这样就可以根据业务的需要灵活地归类的任务了。一般会根据优先级给队列命名(如:high, medium, low).
 如果想要给enqueue传递参数的情况,可以使用enqueue_call方法。在要传递超时参数的情况下:

  1. q = Queue("low", connection = redis_conn)
  2. q.enqueue_call(func=hello, args= ("杨一",),timeout = 30)

3.3 关于worker
   Workers将会从给定的队列中不停的循环读取任务,当所有任务都处理完毕就等待新的work到来。每一个worker在同一时间只处理一个任务。在worker中,是没有并发的。如果你需要并发处理任务,那就需要启动多个worker。
   目前的worker实际上是fork一个子进程来执行具体的任务,也就是说rq不适合windows系统。而且RQ的work是单进程的,如果想要并发执行队列中的任务提高执行效率需要使用threading针对每个任务进行fork线程。
   worker的生命周期有以下几个阶段组成:
   1 启动,载入Python环境
   2 注册,worker注册到系统上,让系统知晓它的存在。
   3 开始监听。从给定的redis队列中取出一个任务。如果所有的队列都是空的且是以突发模式运行的,立即退出。否则,等待新的任务入队。
   4 分配一个子进程。分配的这个子进程在故障安全的上下文中运行实际的任务(调用队列中的任务函数)
   5 处理任务。处理实际的任务。
   6 循环。重复执行步骤3。
四 如何使用
   简单的开发一个deamon 函数,用于后端异步调用,注意任意函数都可以加入队列,必须能够在入队的时候 被程序访问到。
 

  1. #!/usr/bin/env python
  2. #-*- coding:utf-8 -*-
  3. def hello(name):
  4.     print "hello ,%s"%name
  5.     ip='192.168.0.1'
  6.     num=1024
  7.     return name,ip,num
  8. def workat(name):
  9.     print "hello %s ,you r workat youzan.com "%(name)

4.1 构建队列,将任务对象添加到队列里面

  1. >>> from redis import Redis,ConnectionPool
  2. >>> from rq import Queue
  3. >>> pool = ConnectionPool(db=0, host='127.0.0.1', port=6379,
  4. ... password='yangyi')
  5. >>> redis_conn = Redis(connection_pool=pool)
  6. >>> q=Queue(connection=redis_conn)
  7. >>> from rq_test import hello
  8. >>>
  9. >>> result = q.enqueue(hello,'yangyi')
  10. >>> result = q.enqueue(hello,'youzan.com')

先实例化一个Queue类q,然后通过enqueue方法发布任务。第一个参数是执行的函数名,后面是函数执行所需的参数,可以是args也可以是kwargs,案例中是一个字符串。
然后会返回一个Job类的实例,后面会具体介绍Job类的实例具体的api。

4.2启动worker ,从日志上可以看到执行了utils.hello('yangyi') utils.hello('youzan.com') 。当然这个只是简单的调用介绍,生产环境还要写的更加健壮,针对函数执行的结果进行相应的业务逻辑处理。 

  1. root@rac2:~# >python woker.py
  2. 23:44:48 RQ worker u'rq:worker:rac2.3354' started, version 0.6.0
  3. 23:44:48 Cleaning registries for queue: default
  4. 23:44:48
  5. 23:44:48 *** Listening on default...
  6. 23:44:48 default: utils.hello('yangyi') (63879f7c-b453-4405-a262-b9a6b6568b68)
  7. hello ,yangyi
  8. 23:44:48 default: Job OK (63879f7c-b453-4405-a262-b9a6b6568b68)
  9. 23:44:48 Result is kept for 500 seconds
  10. 23:44:48
  11. 23:44:48 *** Listening on default...
  12. 23:45:12 default: utils.hello('youzan.com') (e4e9ed62-c476-45f2-b66a-4b641979e731)
  13. hello ,youzan.com
  14. 23:45:12 default: Job OK (e4e9ed62-c476-45f2-b66a-4b641979e731)
  15. 23:45:12 Result is kept for 500 seconds

需要说明的是其实 worker的启动顺序应该在job放入队列之前,一直监听rq里面是否有具体的任务,当然如果worker晚于job 加入队列启动,job的状态会显示为 queued 状态。
4.3 查看作业执行的情况
当任务加入队列,queue.enqueue()方法返回一个job实例。其定义位于rq.job文件中,可以去查看一下它的API,主要用到的API有:

  1. >>> from rq import job
  2. >>> job = q.enqueue(hello,'youzan.com')
  3. >>> job.get_id() ##获取任务的id ,如果没有指定 ,系统会自动分配一个随机的字符串。
  4. u'17ad0b3a-195e-49d5-8d31-02837ccf5fa6'
  5. >>> job = q.enqueue(hello,'youzan.com')
  6. >>> print job.get_status() ##获取任务的处理状态
  7. finished
  8. >>> step1=q.enqueue(workat,) ##故意不传递参数,让函数执行失败,则获取的状态值是 failed
    >>> print step1.get_status()
    failed
  9. >>> print job.result # 当任务没有执行的时候返回None,否则返回非空值,如果 函数 hello() 有return 的值,会赋值给result
  10. None
  11. 当我们把worker 监听进程停止,然后重新发布任务,查看此时任务的在队列的状态,会显示为 queued
  12. >>> job = q.enqueue(hello,'youzan')
  13. >>> print job.get_status()
  14. queued
  15. >>> print job.to_dict() #把job实例转化成一个字典,我们主要关注状态。
  16. {u'origin': u'default', u'status': u'queued', u'description': u"rq_test.hello('youzan')", u'created_at': '2016-09-06T08:00:40Z', u'enqueued_at': '2016-09-06T08:00:40Z', u'timeout': 180, u'data': '\x80\x02(X\r\x00\x00\x00rq_test.helloq\x01NU\x06youzanq\x02\x85q\x03}q\x04tq\x05.'}
  17. >>> job.cancel() # 取消作业,尽管作业已经被执行,也可以取消
  18. >>> print job.to_dict()
  19. {u'origin': u'default', u'status': u'queued', u'description': u"rq_test.hello('youzan')", u'created_at': '2016-09-06T08:00:40Z', u'enqueued_at': '2016-09-06T08:00:40Z', u'timeout': 180, u'data': '\x80\x02(X\r\x00\x00\x00rq_test.helloq\x01NU\x06youzanq\x02\x85q\x03}q\x04tq\x05.'}
  20. >>> print job.get_status()
  21. queued
  22. >>>
  23. >>> job.delete() # 从redis队列中删除该作业
  24. >>> print job.get_status()
  25. None
  26. >>> print job.to_dict()
  27. {u'origin': u'default', u'description': u"rq_test.hello('youzan')", u'created_at': '2016-09-06T08:00:40Z', u'enqueued_at': '2016-09-06T08:00:40Z', u'timeout': 180, u'data': '\x80\x02(X\r\x00\x00\x00rq_test.helloq\x01NU\x06youzanq\x02\x85q\x03}q\x04tq\x05.'}

五 参考文章
[1]官方文档  
[2] 翻译 - Python RQ Job 
[3] 翻译 - Python RQ Workers  
[4] 云峰就她了 这位博主写了很多rq相关的实践经验,值得参考。

时间: 2024-10-28 02:43:46

【Python】轻量级分布式任务调度系统-RQ的相关文章

XXL-JOB v1.4.2,分布式任务调度平台

V1.4.2 新特性 1.推送新版本 V1.4.2 至中央仓库, 大版本 V1.4 进入维护阶段; <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>1.4.2</version> </dependency> 2.任务新增时,任务列表偏移问题修复; 3.修复一处因bs不支持模态框重

任务调度系统 xxl-job,V1.4.1 新特性速览

分布式任务调度系统 xxl-job新版本V1.4.1更新内容: 1.项目成功推送maven中央仓库, 中央仓库地址以及依赖如下: <!-- http://repo1.maven.org/maven2/com/xuxueli/xxl-job-core/ --> <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <ver

任务调度系统 xxl-job,新版本 V1.4.0 新特性速览

分布式任务调度系统 xxl-job新版本V1.4.0更新内容: 1.任务依赖: 通过事件触发方式实现, 任务执行成功并回调时会主动触发一次子任务的调度; 2.执行器底层实现代码进行重度重构, 优化底层建表脚本; 3.执行器中任务线程分组逻辑优化: 之前根据执行器JobHandler进行线程分组,当多个任务复用Jobhanlder会导致相互阻塞.现改为根据调度中心任务进行任务线程分组,任务与任务执行相互隔离; 4.执行器调度通讯方案优化, 通过Hex + HC实现建议RPC通讯协议, 优化了通讯参

分布式任务调度框架

LTS是一个轻量级分布式任务调度框架.有三种角色, JobClient, JobTracker, TaskTracker.各个节点都是无状态的,可以部署多个,来实现负载均衡,实现更大的负载量, 并且框架具有很好的容错能力. 采用多种注册中心(Zookeeper,redis等)进行节点信息暴露,master选举.(Mongo or Mysql)存储任务队列和任务执行日志, netty做底层通信. 文章转载自 开源中国社区 [http://www.oschina.net]

python的分布式任务huey如何实现异步化任务讲解

 本文我们来分享一个python的轻型的任务队列程序,他可以让python的分布式任务huey实现异步化任务,感兴趣的朋友可以看看.     一个轻型的任务队列,功能和相关的broker没有celery强大,重在轻型,而且代码读起来也比较的简单.  关于huey的介绍:  (比celery轻型,比mrq.rq要好用 !) a lightweight alternative.     written in python     no deps outside stdlib, except redi

伏羲—阿里云分布式调度系统

今天,大数据已经从概念发展到在很多行业落地生根.广泛用在电商.金融.企业等行业,帮助行业分析数据.挖掘数据的价值.即使在传统的医疗.安全.交通等领域也越来越多的应用大数据的技术.数据.价值二者之间的联系是计算,计算是大数据中最核心的部分.大数据计算就是将原来一台台的服务器通过网络连接起来成为一个整体,对外提供体验一致的计算功能,即分布式计算. 点击查看回顾视频 伏羲系统架构 分布式调度系统需要解决两个问题: 任务调度:如何将海量数据分片,并在几千上万台机器上并行处理,最终汇聚成用户需要的结果?当

探秘阿里分布式任务调度服务SchedulerX

7月中上旬,阿里云企业级分布式应用服务EDAS低调上线分布式任务调度服务,目前处于免费公测阶段.SchedulerX--是该服务在阿里内部的产品名字,顾名思义,比调度做的更多.随着公测的开启,阿里巴巴内部又一款核心中间件产品浮出水面. SchedulerX是阿里巴巴集团中间件团队开发的一款高性能.分布式任务调度产品,在阿里内部有着广泛的使用,经过集团内上千个业务应用历经多年打磨而成.截止2016年6月,每天平稳运行集团内几十万个任务,完成每天几亿次的任务调度.在未来SchedulerX将支持更多

解析阿里云分布式调度系统伏羲

云计算并不是无中生有的概念,它是将普通的单台PC的计算能力通过分布式调度的软件连接起来.其最核心的问题是如何把100台.1千台.1万台机器高效的组织起来,灵活的进行任务调度和管理,从而使得可以像使用台式机一样使用云计算.在云计算中,最核心的模块是分布式调度,它好比于云计算的中央处理器.目前,业界已存在多种分布式调度实现方案,如伏羲.Hadoop MR.YARN.Mesos等系统. 阿里云伏羲 伏羲系统是在前人的基础上进行了一系列的改造,首先与YARN和Mesos系统类似,将资源的调度和任务调度分

分布式实时处理系统在高性能计算场景下的应用

本文根据DBAplus社群第82期线上分享整理而成   讲师介绍  卢誉声 Autodesk资深系统研发工程师   <分布式实时处理系统:原理.架构与实现>作者,Hurricane实时处理系统主要贡献者,多部C++领域译作.   大家好,我们今天主要讨论以下几个问题: 机器学习与实时处理系统应用 分布式计算拓扑搭建 消息算法调优 Hurricane计算框架与未来展望   一.机器学习与实时处理系统应用   现在我们先来看看第一部分:机器学习与实时处理系统应用.我们首先简单了解下机器学习,然后引