Python多进程通信Queue、Pipe、Value、Array实例_python

queue和pipe的区别: pipe用来在两个进程间通信。queue用来在多个进程间实现通信。 此两种方法为所有系统多进程通信的基本方法,几乎所有的语言都支持此两种方法。

1)Queue & JoinableQueue

queue用来在进程间传递消息,任何可以pickle-able的对象都可以在加入到queue。

multiprocessing.JoinableQueue 是 Queue的子类,增加了task_done()和join()方法。

task_done()用来告诉queue一个task完成。一般地在调用get()获得一个task,在task结束后调用task_done()来通知Queue当前task完成。

join() 阻塞直到queue中的所有的task都被处理(即task_done方法被调用)。

代码:

复制代码 代码如下:

import multiprocessing
import time

class Consumer(multiprocessing.Process):
   
    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print ('%s: Exiting' % proc_name)
                self.task_queue.task_done()
                break
            print ('%s: %s' % (proc_name, next_task))
            answer = next_task() # __call__()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return

class Task(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b
    def __call__(self):
        time.sleep(0.1) # pretend to take some time to do the work
        return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
    def __str__(self):
        return '%s * %s' % (self.a, self.b)

if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()
   
    # Start consumers
    num_consumers = multiprocessing.cpu_count()
    print ('Creating %d consumers' % num_consumers)
    consumers = [ Consumer(tasks, results)
                  for i in range(num_consumers) ]
    for w in consumers:
        w.start()
   
    # Enqueue jobs
    num_jobs = 10
    for i in range(num_jobs):
        tasks.put(Task(i, i))
   
    # Add a poison pill for each consumer
    for i in range(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()
   
    # Start printing results
    while num_jobs:
        result = results.get()
        print ('Result:', result)
        num_jobs -= 1

注意小技巧: 使用None来表示task处理完毕。

运行结果:

2)pipe

pipe()返回一对连接对象,代表了pipe的两端。每个对象都有send()和recv()方法。

代码:

复制代码 代码如下:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    p.join()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"

3)Value + Array

Value + Array 是python中共享内存 映射文件的方法,速度比较快。

复制代码 代码如下:

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = n.value + 1
    for i in range(len(a)):
        a[i] = a[i] * 10

if __name__ == '__main__':
    num = Value('i', 1)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])
   
    p2 = Process(target=f, args=(num, arr))
    p2.start()
    p2.join()

    print(num.value)
    print(arr[:])

# the output is :
# 2
# [0, 10, 20, 30, 40, 50, 60, 70, 80, 90]
# 3
# [0, 100, 200, 300, 400, 500, 600, 700, 800, 900]

时间: 2024-09-15 08:27:11

Python多进程通信Queue、Pipe、Value、Array实例_python的相关文章

python的multiprocessing多进程通信的pipe和queue介绍

python的multiprocessing提供了IPC(Pipe和Queue),使Python多进程并发,效率上更高.本文我们就来详细介绍一下pipe和queue.     这两天温故了python的multiprocessing多进程模块,看到的pipe和queue这两种ipc方式,啥事ipc? ipc就是进程间的通信模式,常用的一半是socke,rpc,pipe和消息队列等. 今个就再把pipe和queue搞搞. 代码如下   #coding:utf-8 import multiproce

python修改注册表终止360进程实例_python

本文实例讲述了python修改注册表终止360进程的实现方法.分享给大家供大家参考. 具体实现代码如下: import _winreg import os import shutil #复制自身 shutil.copyfile(K3.exe,c:WINDOWSsystem32K3.exe) #把360启动改为自身 run = _winreg.OpenKey( _winreg.HKEY_LOCAL_MACHINE, "SOFTWAREMicrosoftWindowsCurrentVersionRu

python网络编程之读取网站根目录实例_python

本文实例讲述了python网络编程之读取网站根目录的方法,分享给大家供大家参考. 具体实现方法如下: import socket, sys port = 70 host = "quux.org" filename = "//" s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((host, port)) s.sendall(filename+"\r\n") while(

python 循环while和for in简单实例_python

python 循环while和for in简单实例 #!/uer/bin/env python # _*_ coding: utf-8 _*_ lucknumber = 5 b = 0 while b <3: print('guss count:',b) a = int(input('you guse number')) if a > lucknumber: print ('youaerbiger') elif a == lucknumber: print ('youare righet')

Python multiprocessing模块中的Pipe管道使用实例_python

multiprocessing.Pipe([duplex]) 返回2个连接对象(conn1, conn2),代表管道的两端,默认是双向通信.如果duplex=False,conn1只能用来接收消息,conn2只能用来发送消息.不同于os.open之处在于os.pipe()返回2个文件描述符(r, w),表示可读的和可写的 实例如下: 复制代码 代码如下: #!/usr/bin/python #coding=utf-8 import os from multiprocessing import P

Python的Tornado框架异步编程入门实例_python

Tornado Tornado 是一款非阻塞可扩展的使用Python编写的web服务器和Python Web框架, 可以使用Tornado编写Web程序并不依赖任何web服务器直接提供高效的web服务.所以Tornado不仅仅是一个web框架而且还是一款可以用于生产环境的高效的web服务器 Torando 在Linux和FreeBSD上使用高效的异步I/O模型 epoll 和kqueue来实现高效的web服务器, 所以 tornado在Linux上和FreeBSD系列性能可以达到最高接口 当然我

python计算书页码的统计数字问题实例_python

本文实例讲述了python计算书页码的统计数字问题,是Python程序设计中一个比较典型的应用实例.分享给大家供大家参考.具体如下: 问题描述:对给定页码n,计算出全部页码中分别用到多少次数字0,1,2,3,4...,9 实例代码如下: def count_num1(page_num): num_zero = 0 num_one = 0 num_two = 0 num_three = 0 num_four = 0 num_five = 0 num_six = 0 num_seven = 0 nu

python实现根据图标提取分类应用程序实例_python

本文实例讲述了python实现根据图标提取分类应用程序,分享给大家供大家参考. 具体方法如下: #!/usr/bin/python # -*- coding: utf-8 -*- import Image import win32ui import win32gui def make_regalur_image(img, size = (256, 256)): return img.resize(size).convert('RGB') def split_image(img, part_siz

python在windows下实现备份程序实例_python

很多书籍里面讲的Python备份都是在linux下的,而在xp上测试一下也可以执行备份功能,代码都差不多相同,就是到执行打包的时候是不一样的.而且要用到winrar,其他的压缩文件也是一样的. 首先我们要把winrar的路径添加到path里面,这里添加完了要重启机子才有效. 这里要注意:把winrar的路径添加到path里面之后一定要重启,否则path的设定不会起作用,打包就会失败!  这里用到得命令是:winrar a xxx.zip xxxx xxx为任意字符   实例代码如下: #备份脚本