简单谈谈python中的Queue与多进程_python

最近接触一个项目,要在多个虚拟机中运行任务,参考别人之前项目的代码,采用了多进程来处理,于是上网查了查python中的多进程

一、先说说Queue(队列对象)

Queue是python中的标准库,可以直接import 引用,之前学习的时候有听过著名的“先吃先拉”与“后吃先吐”,其实就是这里说的队列,队列的构造的时候可以定义它的容量,别吃撑了,吃多了,就会报错,构造的时候不写或者写个小于1的数则表示无限多

import Queue

q = Queue.Queue(10)

向队列中放值(put)

q.put(‘yang')

q.put(4)

q.put([‘yan','xing'])

在队列中取值get()

默认的队列是先进先出的

>>> q.get()
‘yang'
>>> q.get()
4
>>> q.get()
[‘yan', ‘xing']

当一个队列为空的时候如果再用get取则会堵塞,所以取队列的时候一般是用到

get_nowait()方法,这种方法在向一个空队列取值的时候会抛一个Empty异常

所以更常用的方法是先判断一个队列是否为空,如果不为空则取值

队列中常用的方法

Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.get([block[, timeout]]) 获取队列,timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
非阻塞 Queue.put(item) 写入队列,timeout等待时间
Queue.put_nowait(item) 相当Queue.put(item, False)

二、multiprocessing中使用子进程概念

from multiprocessing import Process

可以通过Process来构造一个子进程

p = Process(target=fun,args=(args))

再通过p.start()来启动子进程

再通过p.join()方法来使得子进程运行结束后再执行父进程

from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
 print 'Run child process %s (%s)...' % (name, os.getpid())

if __name__=='__main__':
 print 'Parent process %s.' % os.getpid()
 p = Process(target=run_proc, args=('test',))
 print 'Process will start.'
 p.start()
 p.join()
 print 'Process end.'

三、在multiprocessing中使用pool

如果需要多个子进程时可以考虑使用进程池(pool)来管理

from multiprocessing import Pool

from multiprocessing import Pool
import os, time

def long_time_task(name):
 print 'Run task %s (%s)...' % (name, os.getpid())
 start = time.time()
 time.sleep(3)
 end = time.time()
 print 'Task %s runs %0.2f seconds.' % (name, (end - start))

if __name__=='__main__':
 print 'Parent process %s.' % os.getpid()
 p = Pool()
 for i in range(5):
  p.apply_async(long_time_task, args=(i,))
 print 'Waiting for all subprocesses done...'
 p.close()
 p.join()
 print 'All subprocesses done.'

pool创建子进程的方法与Process不同,是通过

p.apply_async(func,args=(args))实现,一个池子里能同时运行的任务是取决你电脑的cpu数量,如我的电脑现在是有4个cpu,那会子进程task0,task1,task2,task3可以同时启动,task4则在之前的一个某个进程结束后才开始

上面的程序运行后的结果其实是按照上图中1,2,3分开进行的,先打印1,3秒后打印2,再3秒后打印3

代码中的p.close()是关掉进程池子,是不再向里面添加进程了,对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

当时也可以是实例pool的时候给它定义一个进程的多少

如果上面的代码中p=Pool(5)那么所有的子进程就可以同时进行

三、多个子进程间的通信

多个子进程间的通信就要采用第一步中说到的Queue,比如有以下的需求,一个子进程向队列中写数据,另外一个进程从队列中取数据,

#coding:gbk

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
 for value in ['A', 'B', 'C']:
  print 'Put %s to queue...' % value
  q.put(value)
  time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
 while True:
  if not q.empty():
   value = q.get(True)
   print 'Get %s from queue.' % value
   time.sleep(random.random())
  else:
   break

if __name__=='__main__':
 # 父进程创建Queue,并传给各个子进程:
 q = Queue()
 pw = Process(target=write, args=(q,))
 pr = Process(target=read, args=(q,))
 # 启动子进程pw,写入:
 pw.start()
 # 等待pw结束:
 pw.join()
 # 启动子进程pr,读取:
 pr.start()
 pr.join()
 # pr进程里是死循环,无法等待其结束,只能强行终止:
 print
 print '所有数据都写入并且读完'

四、关于上面代码的几个有趣的问题

if __name__=='__main__':
 # 父进程创建Queue,并传给各个子进程:
 q = Queue()
 p = Pool()
 pw = p.apply_async(write,args=(q,))
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()

 print
 print '所有数据都写入并且读完'

如果main函数写成上面的样本,本来我想要的是将会得到一个队列,将其作为参数传入进程池子里的每个子进程,但是却得到

RuntimeError: Queue objects should only be shared between processes through inheritance

的错误,查了下,大意是队列对象不能在父进程与子进程间通信,这个如果想要使用进程池中使用队列则要使用multiprocess的Manager类

if __name__=='__main__':
 manager = multiprocessing.Manager()
 # 父进程创建Queue,并传给各个子进程:
 q = manager.Queue()
 p = Pool()
 pw = p.apply_async(write,args=(q,))
 time.sleep(0.5)
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()

 print
 print '所有数据都写入并且读完'

这样这个队列对象就可以在父进程与子进程间通信,不用池则不需要Manager,以后再扩展multiprocess中的Manager类吧

关于锁的应用,在不同程序间如果有同时对同一个队列操作的时候,为了避免错误,可以在某个函数操作队列的时候给它加把锁,这样在同一个时间内则只能有一个子进程对队列进行操作,锁也要在manager对象中的锁

#coding:gbk

from multiprocessing import Process,Queue,Pool
import multiprocessing
import os, time, random

# 写数据进程执行的代码:
def write(q,lock):
 lock.acquire() #加上锁
 for value in ['A', 'B', 'C']:
  print 'Put %s to queue...' % value
  q.put(value)
 lock.release() #释放锁 

# 读数据进程执行的代码:
def read(q):
 while True:
  if not q.empty():
   value = q.get(False)
   print 'Get %s from queue.' % value
   time.sleep(random.random())
  else:
   break

if __name__=='__main__':
 manager = multiprocessing.Manager()
 # 父进程创建Queue,并传给各个子进程:
 q = manager.Queue()
 lock = manager.Lock() #初始化一把锁
 p = Pool()
 pw = p.apply_async(write,args=(q,lock))
 pr = p.apply_async(read,args=(q,))
 p.close()
 p.join()

 print
 print '所有数据都写入并且读完'

以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索python
, queue
, 多进程
多进程并发
python 多进程 queue、python 多进程、python 多进程并发、python 多线程 多进程、python多进程共享变量,以便于您获取更多的相关知识。

时间: 2024-10-27 04:20:15

简单谈谈python中的Queue与多进程_python的相关文章

简单谈谈Python中的反转字符串问题_python

按单词反转字符串是一道很常见的面试题.在Python中实现起来非常简单. def reverse_string_by_word(s): lst = s.split() # split by blank space by default return ' '.join(lst[::-1]) s = 'Power of Love' print reverse_string_by_word(s) # Love of Power s = 'Hello World!' print reverse_stri

简单谈谈Python中函数的可变参数_python

前言 在Python中定义函数,可以用必选参数.默认参数.可变参数和关键字参数,这4种参数都可以一起使用,或者只用其中某些,但是请注意,参数定义的顺序必须是:必选参数.默认参数.可变参数和关键字参数. 可变参数( * ) 可变参数,顾名思义,它的参数是可变的,比如列表.字典等.如果我们需要函数处理可变数量参数的时候,就可以使用可变参数. 我们在查看很多Python源码时,经常会看到 某函数(*参数1, **参数2)这样的函数定义,这个*参数和**参数就是可变参数,一时会让人有点费解.其实只要把函

简单谈谈python中的多进程_python

进程是由系统自己管理的. 1:最基本的写法 from multiprocessing import Pool def f(x): return x*x if __name__ == '__main__': p = Pool(5) print(p.map(f, [1, 2, 3])) [1, 4, 9] 2.实际上是通过os.fork的方法产生进程的 unix中,所有进程都是通过fork的方法产生的. multiprocessing Process os info(title): title ,

简单谈谈Python中的闭包_python

Python中的闭包 1. 闭包的概念 首先还得从基本概念说起,什么是闭包呢?来看下维基上的解释: 复制代码 代码如下: 在计算机科学中,闭包(Closure)是词法闭包(Lexical Closure)的简称,是引用了自由变量的函数.这个被引用的自由变量将和这个函数一同存在,即使已经离开了创造它的环境也不例外.所以,有另一种说法认为闭包是由函数和与其相关的引用环境组合而成的实体.闭包在运行时可以有多个实例,不同的引用环境和相同的函数组合可以产生不同的实例. .... 上面提到了两个关键的地方:

练习--python中的Queue与多进程(multiprocessing)

按官方说法: This module is OBSOLETE and is only provided on PyPI to support old projects that still use it. Please DO NOT USE IT FOR NEW PROJECTS! Use modern alternatives like the multiprocessing module in the standard library or even an asynchroneous app

简单介绍Python中的RSS处理_python

RSS 是一个可用多种扩展来表示的缩写:"RDF 站点摘要(RDF Site Summary)"."真正简单的辛迪加(Really Simple Syndication)"."丰富站点摘要(Rich Site Summary)",也许还能用其他扩展来表示.在如此混乱的名称背后,您会发现和这样一个平凡的技术领域相关的故事多得令人吃惊.RSS 是用于分发 Web 站点上的内容的摘要的一种简单的 XML 格式.它能够用于共享各种各样的信息,包括(但不是

简单介绍Python中的JSON模块_python

(一)什么是json: JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式.易于人阅读和编写.同时也易于机器解析和生成.它基于JavaScript Programming Language, Standard ECMA-262 3rd Edition - December 1999的一个子集.JSON采用完全独立于语言的文本格式,但是也使用了类似于C语言家族的习惯(包括C, C++, C#, Java, JavaScript, Perl, Python等

Python 中的queue模块

本文转载至: https://blog.linuxeye.com/334.html 作为标准库中的queue模块,在实际的开发中也是很常见的.其实数据结构这种东西,在任何的编程语言中都是相通的,无非表达方式有些不同罢了.那么, 今天就来看看如何在Python中使用队列. 创建一个队列 import Queue q = Queue.Queue(maxsize = 10) 可见对于模块Queue里面的类Queue而言,是可以在初始化的时候指定队列大小的.队列长度可为无限或者有限.由可选参数maxsi

简单谈谈Python流程控制语句_python

人们常说人生就是一个不断做选择题的过程:有的人没得选,只有一条路能走:有的人好一点,可以二选一:有些能力好或者家境好的人,可以有更多的选择:还有一些人在人生的迷茫期会在原地打转,找不到方向.对于相信有上帝的人来讲,这就好像是上帝事先为我们制定好了人生路线,也好像是那些神仙为唐曾师徒的取经之路提前设置的重重磨难,上帝和神仙掌控了一切.编程语言可以模拟人类生活的方方面面,程序员就像上帝和神仙一样可以通过编程语言中特殊的关键字控制程序的执行过程,这些关键字组成的就是流程控制语句. 编程语言中的流程控制