在Python程序中实现分布式进程的教程

   这篇文章主要介绍了在Python程序中实现分布式进程的教程,在多进程编程中十分有用,示例代码基于Python2.x版本,需要的朋友可以参考下

  在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。

  Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。

  举个例子:如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现?

  原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。

  我们先看服务进程,服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务:

  ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# taskmanager.py
 
import random, time, Queue
from multiprocessing.managers import BaseManager
 
# 发送任务的队列:
task_queue = Queue.Queue()
# 接收结果的队列:
result_queue = Queue.Queue()
 
# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
pass
 
# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey='abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
n = random.randint(0, 10000)
print('Put task %d...' % n)
task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
r = result.get(timeout=10)
print('Result: %s' % r)
# 关闭:
manager.shutdown()

  请注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。

  然后,在另一台机器上启动任务进程(本机上启动也可以):

  ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# taskworker.py
 
import time, sys, Queue
from multiprocessing.managers import BaseManager
 
# 创建类似的QueueManager:
class QueueManager(BaseManager):
pass
 
# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
 
# 连接到服务器,也就是运行taskmanager.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与taskmanager.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey='abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except Queue.Empty:
print('task queue is empty.')
# 处理结束:
print('worker exit.')

  任务进程要通过网络连接到服务进程,所以要指定服务进程的IP。

  现在,可以试试分布式进程的工作效果了。先启动taskmanager.py服务进程:

  ?

1
2
3
4
5
6
7
8
9
10
11
12

$ python taskmanager.py
Put task 3411...
Put task 1605...
Put task 1398...
Put task 4729...
Put task 5300...
Put task 7471...
Put task 68...
Put task 4219...
Put task 339...
Put task 7866...
Try get results...

  taskmanager进程发送完任务后,开始等待result队列的结果。现在启动taskworker.py进程:

  ?

1
2
3
4
5
6
7
8
9
10
11
12
13

$ python taskworker.py 127.0.0.1
Connect to server 127.0.0.1...
run task 3411 * 3411...
run task 1605 * 1605...
run task 1398 * 1398...
run task 4729 * 4729...
run task 5300 * 5300...
run task 7471 * 7471...
run task 68 * 68...
run task 4219 * 4219...
run task 339 * 339...
run task 7866 * 7866...
worker exit.

  taskworker进程结束,在taskmanager进程中会继续打印出结果:

  ?

1
2
3
4
5
6
7
8
9
10

Result: 3411 * 3411 = 11634921
Result: 1605 * 1605 = 2576025
Result: 1398 * 1398 = 1954404
Result: 4729 * 4729 = 22363441
Result: 5300 * 5300 = 28090000
Result: 7471 * 7471 = 55815841
Result: 68 * 68 = 4624
Result: 4219 * 4219 = 17799961
Result: 339 * 339 = 114921
Result: 7866 * 7866 = 61873956

  这个简单的Manager/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,比如把计算n*n的代码换成发送邮件,就实现了邮件队列的异步发送。

  Queue对象存储在哪?注意到taskworker.py中根本没有创建Queue的代码,所以,Queue对象存储在taskmanager.py进程中:


  而Queue之所以能通过网络访问,就是通过QueueManager实现的。由于QueueManager管理的不止一个Queue,所以,要给每个Queue的网络调用接口起个名字,比如get_task_queue。

  authkey有什么用?这是为了保证两台机器正常通信,不被其他机器恶意干扰。如果taskworker.py的authkey和taskmanager.py的authkey不一致,肯定连接不上。

  小结

  Python的分布式进程接口简单,封装良好,适合需要把繁重任务分布到多台机器的环境下。

  注意Queue的作用是用来传递任务和接收结果,每个任务的描述数据量要尽量小。比如发送一个处理日志文件的任务,就不要发送几百兆的日志文件本身,而是发送日志文件存放的完整路径,由Worker进程再去共享的磁盘上读取文件。

时间: 2024-10-03 10:47:38

在Python程序中实现分布式进程的教程的相关文章

编程-python问题求解,怎么在程序中结束整个进程?

问题描述 python问题求解,怎么在程序中结束整个进程? tb=myblock()def my_block(): tb.Start(True) tb.Wait()t=threading.Thread(target=my_block)t.setDaemon(True)t.start()start_time=time.time()while True: print ""susccess"" print time.time() if int(time.time()-st

在Python程序中操作文件之flush()方法的使用

  这篇文章主要介绍了在Python程序中操作文件之flush()方法的使用教程,是Python入门学习中的基础知识,需要的朋友可以参考下 flush()方法刷新内部缓冲区,像标准输入输出的fflush.这类似文件的对象,无操作. Python关闭时自动刷新文件.但是可能要关闭任何文件之前刷新数据. 语法 以下是flush()方法的语法: ? 1 fileObject.flush(); 参数 NA 返回值 此方法不返回任何值. 例子 下面的例子显示了flush()方法的使用. ? 1 2 3 4

在Python程序中操作文件之isatty()方法的使用

  这篇文章主要介绍了在Python程序中操作文件之isatty()方法的使用教程,是Python入门学习中的基础知识,需要的朋友可以参考下 如果文件已连接(与终端设备相关联)到一个tty(状)的设备,isatty()方法返回True,否则返回False. 语法 以下是isatty()方法的语法: ? 1 fileObject.isatty(); 参数 NA 返回值 如果该文件被连接(与终端设备相关联)到一个tty(类似终端)设备此方法返回true,否则返回false. 例子 下面的例子显示了i

在Python程序中进行文件读取和写入操作的教程

  这篇文章主要介绍了在Python程序中进行文件读取和写入操作的教程,是Python学习当中的基础知识,需要的朋友可以参考下 读写文件是最常见的IO操作.Python内置了读写文件的函数,用法和C是兼容的. 读写文件前,我们先必须了解一下,在磁盘上读写文件的功能都是由操作系统提供的,现代操作系统不允许普通的程序直接操作磁盘,所以,读写文件就是请求操作系统打开一个文件对象(通常称为文件描述符),然后,通过操作系统提供的接口从这个文件对象中读取数据(读文件),或者把数据写入这个文件对象(写文件).

linux下某程序中实现对进程的实时流量监控功能

问题描述 linux下某程序中实现对进程的实时流量监控功能 求大牛赐教 现在开发了一个程序,在linux下跑,想在里面加一个对特定进程的网络流量监控,实时统计进程流量大小 现在想到的办法就是用libpcap库,对应/proc里面文件按照pid 端口号 数据包 数据大小 进行统计得出当前流量大小. 目前有如下问题: 1.程序中已有功能中已经使用了libpcap去抓去一段数据包然后输出libpcap文件,如果按照上述办法,会不会造成再用libpcap采集数据包出问题?或者说libpcap可不可以多次

Python程序中设置HTTP代理_python

0x00 前言 大家对HTTP代理应该都非常熟悉,它在很多方面都有着极为广泛的应用.HTTP代理分为正向代理和反向代理两种,后者一般用于将防火墙后面的服务提供给用户访问或者进行负载均衡,典型的有Nginx.HAProxy等.本文所讨论的是正向代理. HTTP代理最常见的用途是用于网络共享.网络加速和网络限制突破等.此外,HTTP代理也常用于Web应用调试.Android/IOS APP 中所调用的Web API监控和分析,目前的知名软件有Fiddler.Charles.Burp Suite和mi

Python程序中不同的重启机制

分析典型案例: Celery 分布式异步任务框架 Gunicorn Web容器 之所以挑这两个,不仅仅是应用广泛,而且两个的进程模型比较类似,都是Master.Worker的形式,在热重启上思路和做法又基本不同,比较有参考意义 知识点: atexit os.execv 模块共享变量 信号处理 sleep原理:select 文件描述符共享 这几个知识点不难,区别只在于Celery和Gunicorn的应用方式.如果脑海中有这样的知识点,这篇文章也就是开阔下视野而已... Celery和Gunicor

从Python程序中访问Java类的简单示例_python

from jnius import autoclass >>> Stack = autoclass('java.util.Stack') >>> stack = Stack() >>> stack.push('hello') >>> stack.push('world') >>> stack.pop() 'world' >>> stack.pop() 'hello' 上面的代码中,我们使用 auto

Python程序中使用SQLAlchemy时出现乱码的解决方案_python

今天对clubot进行了升级, 但是导入数据后中文乱码, 一开是找资料说是在创建引擎的时候添加编码信息: engine = create_engine("mysql://root:@localhost:3306/clubot?charset=utf8") 但是这并不行, 然后查看表信息: > show create table clubot_members; clubot_members | CREATE TABLE `clubot_members` ( `id` int(11)