python队列模块Queue用法详解

一、初识Queue模块

Queue模块实现了多生产者、多消费者队列。它特别适用于信息必须在多个线程间安全地交换的多线程程序中。这个模块中的Queue类实现了所有必须的锁语义。它依赖于Python中线程支持的可用性;参见threading模块。

模块实现了三类队列:FIFO(First In First Out,先进先出,默认为该队列)、LIFO(Last In First Out,后进先出)、基于优先级的队列。以下为其常用方法:

先进先出  q = Queue.Queue(maxsize)
后进先出  a = Queue.LifoQueue(maxsize)
优先级  Queue.PriorityQueue(maxsize)
Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.full 与 maxsize 大小对应
Queue.put(item) 写入队列,timeout等待时间   非阻塞
Queue.get([block[, timeout]]) 获取队列,timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
Queue.put_nowait(item) 相当Queue.put(item, False)
Queue.task_done() 在完成一项工作之后,函数向任务已经完成的队列发送一个信号
Queue.join(): 实际上意味着等到队列为空,再执行别的操作
更详细部分可以参看python标准库之Queue模块介绍。

二、队列示列

1、FIFO(先进先出)

import Queue
q = Queue.Queue()
for i in range(5):
    q.put(i)
while not q.empty():
    print q.get()
其输出结果如下:

[root@361way queue]# python fifo.py

0
1
2
3
4

其输出顺序与进入顺序相同。

2、LIFO(后进先出)

import Queue
q = Queue.LifoQueue()
for i in range(5):
    q.put(i)
while not q.empty():
    print q.get()
执行结果如下:

import Queue
q = Queue.LifoQueue()
for i in range(5):
    q.put(i)
while not q.empty():
    print q.get()

3、带优先级的队列

import Queue
class Job(object):
    def __init__(self, priority, description):
        self.priority = priority
        self.description = description
        print 'New job:', description
        return
    def __cmp__(self, other):
        return cmp(self.priority, other.priority)
q = Queue.PriorityQueue()
q.put( Job(3, 'Mid-level job') )
q.put( Job(10, 'Low-level job') )
q.put( Job(1, 'Important job') )
while not q.empty():
    next_job = q.get()
    print 'Processing job:', next_job.description

执行结果如下:

[root@361way queue]# python Queue_priority.py
New job: Mid-level job
New job: Low-level job
New job: Important job
Processing job: Important job
Processing job: Mid-level job
Processing job: Low-level job
从上面的执行结果可以看出,优先级值设置越小,越先执行。另外这里是以单线程为例的,在多thread的示例中,多个线程同时get()  item 时,这时就可以根据优先级决定哪一个任务先执行。

三、队列与线程

在实际使用队列是与线程结合在一起的。这里列几个队列与线程的代码示例:

from Queue import *
from threading import Thread
import sys
'''this function will process the items in the queue, in serial'''
def processor():
    while True:
        if queue.empty() == True:
            print "the Queue is empty!"
            sys.exit(1)
        try:
            job = queue.get()
            print "I'm operating on job item: %s"%(job)
            queue.task_done()
        except:
            print "Failed to operate on job"
'''set variables'''
queue = Queue()
threads = 4
'''a list of job items. you would want this to be more advanced,
like reading from a file or database'''
jobs = [ "job1", "job2", "job3" ]
'''iterate over jobs and put each into the queue in sequence'''
#for job in jobs:
for job in range(100):
     print "inserting job into the queue: %s"%(job)
     queue.put(job)
'''start some threads, each one will process one job from the queue'''
#for i in range(100):
for i in range(threads):
     th = Thread(target=processor)
     th.setDaemon(True)
     th.start()
'''wait until all jobs are processed before quitting'''
queue.join()

需要注意的是processer函数里的“ while True:”行 ,如果没了这行,当线程(thread)数小于队列数时,第一轮循环完后就会卡住,不执行后面的循环了。所以加上该行,就相当于开始了一个死循环,直到所有的队列结束时,队列为空,循环结束。

示例2:

[root@361way tmp]# python queue-example-1.py
task 0 finished
task 1 finished
task 3 finished
task 2 finished
task 5 finished
task 4 finished
task 6 finished
task 7 finished
task 9 finished
task 8 finished
[root@361way tmp]# more queue-example-1.py
# File: queue-example-1.py
import threading
import Queue
import time, random
WORKERS = 2
class Worker(threading.Thread):
    def __init__(self, queue):
        self.__queue = queue
        threading.Thread.__init__(self)
    def run(self):
        while 1:
            item = self.__queue.get()
            if item is None:
                break # reached end of queue
            # pretend we're doing something that takes 10-100 ms
            time.sleep(random.randint(10, 100) / 1000.0)
            print "task", item, "finished"
#
# try it
queue = Queue.Queue(0)
for i in range(WORKERS):
    Worker(queue).start() # start a worker
for i in range(10):
    queue.put(i)
for i in range(WORKERS):
    queue.put(None) # add end-of-queue markers

python queue模块有三种队列:

1、python queue模块的FIFO队列先进先出。
2、LIFO类似于堆。即先进后出。
3、还有一种是优先级队列级别越低越先出来。

针对这三种队列分别有三个构造函数:

1、class Queue.Queue(maxsize) FIFO
2、class Queue.LifoQueue(maxsize) LIFO
3、class Queue.PriorityQueue(maxsize) 优先级队列

介绍一下此包中的常用方法:

Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.full 与 maxsize 大小对应
Queue.get([block[, timeout]])获取队列,timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
非阻塞 Queue.put(item) 写入队列,timeout等待时间
Queue.put_nowait(item) 相当Queue.put(item, False)
Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
Queue.join() 实际上意味着等到队列为空,再执行别的操作
 
 
附上一个例子:

#coding:utf-8

import Queue
import threading
import time
import random

q = Queue.Queue(0) #当有多个线程共享一个东西的时候就可以用它了
NUM_WORKERS = 3

class MyThread(threading.Thread):

    def __init__(self,input,worktype):
       self._jobq = input
       self._work_type = worktype
       threading.Thread.__init__(self)

    def run(self):
       while True:
           if self._jobq.qsize() > 0:
               self._process_job(self._jobq.get(),self._work_type)
           else:break

    def _process_job(self, job, worktype):
       doJob(job,worktype)

def doJob(job, worktype):
   time.sleep(random.random() * 3)
    print"doing",job," worktype ",worktype

if __name__ == '__main__':
    print "begin...."
    for i inrange(NUM_WORKERS * 2):
       q.put(i) #放入到任务队列中去
    print "job qsize:",q.qsize()

    for x inrange(NUM_WORKERS):
       MyThread(q,x).start()

python下多线程的思考

先来个例子:

 
import Queue,threading,time,random
 
 
 
class consumer(threading.Thread):
    def __init__(self,que):
        threading.Thread.__init__(self)
        self.daemon = False
        self.queue = que
    def run(self):
        while True:
            if self.queue.empty():
                break
            item = self.queue.get()
            #processing the item 
            time.sleep(item)
            print self.name,item
            self.queue.task_done()
        return
que = Queue.Queue()
for x in range(10):
    que.put(random.random() * 10, True, None)
consumers = [consumer(que) for x in range(3)]
 
for c in consumers:
    c.start()
que.join()
import Queue,threading,time,random

 

class consumer(threading.Thread):
    def __init__(self,que):
        threading.Thread.__init__(self)
        self.daemon = False
        self.queue = que
    def run(self):
        while True:
            if self.queue.empty():
                break
            item = self.queue.get()
            #processing the item
            time.sleep(item)
            print self.name,item
            self.queue.task_done()
        return
que = Queue.Queue()
for x in range(10):
    que.put(random.random() * 10, True, None)
consumers = [consumer(que) for x in range(3)]

for c in consumers:
    c.start()
que.join()

 

代码的功能是产生10个随机数(0~10范围),sleep相应时间后输出数字和线程名称

这段代码里,是一个快速生产者(产生10个随机数),3个慢速消费者的情况。

在这种情况下,先让三个consumers跑起来,然后主线程用que.join()阻塞。

当三个线程发现队列都空时,各自的run函数返回,三个线程结束。同时主线程的阻塞打开,全部程序结束。

 
而对于慢速生产者和快速消费者而言,代码如下:

 
import Queue,threading,time,random
 
 
class consumer(threading.Thread):
    def __init__(self,que):
        threading.Thread.__init__(self)
        self.daemon = False
        self.queue = que
    def run(self):
        while True:
            item = self.queue.get()
            if item == None:
                break
            #processing the item 
            print self.name,item
            self.queue.task_done()
        self.queue.task_done()
        return
que = Queue.Queue()
 
consumers = [consumer(que) for x in range(3)]
for c in consumers:
    c.start()
for x in range(10):
    item = random.random() * 10
    time.sleep(item)
    que.put(item, True, None)
 
 
que.put(None)
que.put(None)
que.put(None)
que.join()
import Queue,threading,time,random

 

class consumer(threading.Thread):
    def __init__(self,que):
        threading.Thread.__init__(self)
        self.daemon = False
        self.queue = que
    def run(self):
        while True:
            item = self.queue.get()
            if item == None:
                break
            #processing the item
            print self.name,item
            self.queue.task_done()
        self.queue.task_done()
        return
que = Queue.Queue()

consumers = [consumer(que) for x in range(3)]
for c in consumers:
    c.start()
for x in range(10):
    item = random.random() * 10
    time.sleep(item)
    que.put(item, True, None)

que.put(None)
que.put(None)
que.put(None)
que.join()

 

这种情况下,快速消费者在get时需要阻塞(否则返回了这线程就结束了~)因此对于停止整个程序,使用的是None标记,让子线程遇到None便返回结束。

因为消费速度大于产生速度,因此先运行子线程等待队列加入新的元素,然后再慢速地添加任务。

注意最后put(None)三次,是因为每个线程返回都会取出一个None,都要这样做才可以使三个线程全部停止。当然有种更简单粗暴的方法,就是把子线程设置为deamon,一但生产完成,开始que.join()阻塞直至队列空就结束主线程,子线程虽然在阻塞等待队列也会因为deamon属性而被强制关闭。。。。

 
 

本文举了2个单一生产者多消费者的例子。参考资料是<python参考手册>第三版,上面有单c和单p的代码。

有关多线程的函数如put,join什么的, 还是自己先看书学好概念吧~

时间: 2024-11-03 02:10:45

python队列模块Queue用法详解的相关文章

Python中内置的日志模块logging用法详解_python

logging模块简介 Python的logging模块提供了通用的日志系统,可以方便第三方模块或者是应用使用.这个模块提供不同的日志级别,并可以采用不同的方式记录日志,比如文件,HTTP GET/POST,SMTP,Socket等,甚至可以自己实现具体的日志记录方式. logging模块与log4j的机制是一样的,只是具体的实现细节不同.模块提供logger,handler,filter,formatter. logger:提供日志接口,供应用代码使用.logger最长用的操作有两类:配置和发

Python字典简介以及用法详解_python

#!/usr/bin/env python # -*- coding:utf-8 -*- """ 老规矩以下方法环境2.7.x,请3.x以上版本的朋友记得格式print(输出内容放入括号内) 字典的基本组成以及用法 dict = { key : value } dict[ key ] = value 首先来说说字典是由key键与value值一一对应来组成字典的基本结构 key键不能由list列表,dict字典等多元素命名, key是唯一属性又可以称一对一服务,key相同但只会

python 迭代器和生成器用法详解

python 迭代器 迭代器是一个实现了迭代器协议的对象,Python中的迭代器协议就是有next方法的对象会前进到下一结果,而在一系列结果的末尾是,则会引发StopIteration. >>> l = range(2) >>> i = iter(l) >>> i <listiterator object at 0x10a38d990> >>> i.next() 0 >>> i.next() 1 >

python中yield的用法详解

列表解析: [expr for iter_var in iterable if cond_expr] 生成器表达式: (expr for iter_var in iterable if cond_expr) 生成器最大的却别是它并不返回一个真正的数组 rows = [1,2,3,17]   def cols():     yield 56     yield 2     yield 1     abc = cols()   for i in abc:     print i   print ro

Python中itertools模块用法详解_python

本文实例讲述了Python中itertools模块用法,分享给大家供大家参考.具体分析如下: 一般来说,itertools模块包含创建有效迭代器的函数,可以用各种方式对数据进行循环操作,此模块中的所有函数返回的迭代器都可以与for循环语句以及其他包含迭代器(如生成器和生成器表达式)的函数联合使用. chain(iter1, iter2, ..., iterN): 给出一组迭代器(iter1, iter2, ..., iterN),此函数创建一个新迭代器来将所有的迭代器链接起来,返回的迭代器从it

Python中线程编程之threading模块的使用详解

  这篇文章主要介绍了Python中线程编程之threading模块的使用详解,由于GIL的存在,线程一直是Python编程中的焦点问题,需要的朋友可以参考下 threading.Thread Thread 是threading模块中最重要的类之一,可以使用它来创建线程.有两种方式来创建线程:一种是通过继承Thread类,重写它的run方法;另一种是创建一个threading.Thread对象,在它的初始化函数(__init__)中将可调用对象作为参数传入.下面分别举例说明.先来看看通过继承th

Python中的下划线详解

  这篇文章主要介绍了Python中的下划线详解,本文讲解了单个下划线直接做变量名.单下划线前缀的名称.双下划线前缀的名称等内容,需要的朋友可以参考下 这篇文章讨论Python中下划线_的使用.跟Python中很多用法类似,下划线_的不同用法绝大部分(不全是)都是一种惯例约定. 一. 单个下划线直接做变量名(_) 主要有三种情况: 1. 解释器中 _符号是指交互解释器中最后一次执行语句的返回结果.这种用法最初出现在CPython解释器中,其他解释器后来也都跟进了. 代码如下: >>> _

php9个超全局变量的用法详解(二)

今天来讲一下$_GET()与$_POST(). 其实很容易理解,根据表面意思就可以看得出来,是获得post与get表单的数据,其实也正是如此,来点专业的话来讲, $_GET 变量是一个数组,内容是由 HTTP GET 方法发送的变量名称和值. $_GET 变量用于收集来自 method="get" 的表单中的值.从带有 GET 方法的表单发送的信息,对任何人都是可见的(会显示在浏览器的地址栏),并且对发送的信息量也有限制(最多 100 个字符).好,看个例子,上简单登录界面的代码: 登

php9个超全局变量的用法详解(一)

PHP 中的许多预定义变量都是"超全局的",这意味着它们在一个脚本的全部作用域中都可用.在函数或方法中无需执行 global $variable; 就可以访问它们. 这些超全局变量是: $GLOBALS $_SERVER $_GET $_POST $_FILES $_COOKIE $_SESSION $_REQUEST $_ENV 1.先看一下$GLOBALS,它是包含了全部变量的全局组合数组,什么意思呢,看一个c语言程序 int main() { int a = 3; void t(