Python一行代码完成并行任务

众所周知,Python的并行处理能力很不理想。我认为如果不考虑线程和GIL的标准参数(它们大多是合法的),其原因不是因为技术不到位,而是我们的使用方法不恰当。大多数关于Python线程和多进程的教材虽然都很出色,但是内容繁琐冗长。它们的确在开篇铺陈了许多有用信息,但往往都不会涉及真正能提高日常工作的部分。

经典例子

DDG上以“Python threading tutorial (Python线程教程)”为关键字的热门搜索结果表明:几乎每篇文章中给出的例子都是相同的类+队列。

事实上,它们就是以下这段使用producer/Consumer来处理线程/多进程的代码示例:


  1. #Example.py 
  2.  
  3. ''' 
  4.  
  5.     Standard Producer/Consumer Threading Pattern 
  6.  
  7. ''' 
  8.  
  9.   
  10.  
  11. import time 
  12.  
  13. import threading 
  14.  
  15. import Queue 
  16.  
  17.   
  18.  
  19. class Consumer(threading.Thread): 
  20.  
  21. def __init__(self, queue): 
  22.  
  23.     threading.Thread.__init__(self) 
  24.  
  25.     self._queue = queue 
  26.  
  27.   
  28.  
  29. def run(self): 
  30.  
  31.     while True: 
  32.  
  33.         # queue.get() blocks the current thread until 
  34.  
  35.         # an item is retrieved. 
  36.  
  37.         msg = self._queue.get() 
  38.  
  39.         # Checks if the current message is 
  40.  
  41.         # the "Poison Pill" 
  42.  
  43.         if isinstance(msg, str) and msg == 'quit': 
  44.  
  45.             # if so, exists the loop 
  46.  
  47.             break 
  48.  
  49.         # "Processes" (or in our case, prints) the queue item 
  50.  
  51.         print "I'm a thread, and I received %s!!" % msg 
  52.  
  53.         # Always be friendly! 
  54.  
  55.     print 'Bye byes!' 
  56.  
  57.   
  58.  
  59. def Producer(): 
  60.  
  61.     # Queue is used to share items between 
  62.  
  63.     # the threads. 
  64.  
  65.     queue = Queue.Queue() 
  66.  
  67.   
  68.  
  69.     # Create an instance of the worker 
  70.  
  71.     worker = Consumer(queue) 
  72.  
  73.     # start calls the internal run() method to 
  74.  
  75.     # kick off the thread 
  76.  
  77.     worker.start() 
  78.  
  79.   
  80.  
  81.     # variable to keep track of when we started 
  82.  
  83.     start_time = time.time() 
  84.  
  85.     # While under 5 seconds.. 
  86.  
  87.     while time.time() - start_time < 5: 
  88.  
  89.         # "Produce" a piece of work and stick it in 
  90.  
  91.         # the queue for the Consumer to process 
  92.  
  93.         queue.put('something at %s' % time.time()) 
  94.  
  95.     # Sleep a bit just to avoid an absurd number of messages 
  96.  
  97.     time.sleep(1) 
  98.  
  99.   
  100.  
  101.     # This the "poison pill" method of killing a thread. 
  102.  
  103.     queue.put('quit') 
  104.  
  105.     # wait for the thread to close down 
  106.  
  107.     worker.join() 
  108.  
  109.   
  110.  
  111. if __name__ == '__main__': 
  112.  
  113. Producer()  

唔…….感觉有点像Java。

我现在并不想说明使用Producer / Consume来解决线程/多进程的方法是错误的——因为它肯定正确,而且在很多情况下它是最佳方法。但我不认为这是平时写代码的最佳选择。

它的问题所在(个人观点)

首先,你需要创建一个样板式的铺垫类。然后,你再创建一个队列,通过其传递对象和监管队列的两端来完成任务。(如果你想实现数据的交换或存储,通常还涉及另一个队列的参与)。

Worker越多,问题越多。

接下来,你应该会创建一个worker类的pool来提高Python的速度。下面是IBM tutorial给出的较好的方法。这也是程序员们在利用多线程检索web页面时的常用方法。


  1. #Example2.py 
  2.  
  3. """ 
  4.  
  5. A more realistic thread pool example 
  6.  
  7. """ 
  8.  
  9.   
  10.  
  11. import time 
  12.  
  13. import threading 
  14.  
  15. import Queue 
  16.  
  17. import urllib2 
  18.  
  19.   
  20.  
  21. class Consumer(threading.Thread): 
  22.  
  23.     def __init__(self, queue): 
  24.  
  25.         threading.Thread.__init__(self) 
  26.  
  27.         self._queue = queue 
  28.  
  29.   
  30.  
  31.     def run(self): 
  32.  
  33.         while True: 
  34.  
  35.             content = self._queue.get() 
  36.  
  37.             if isinstance(content, str) and content == "quit": 
  38.  
  39.                 break 
  40.  
  41.             response = urllib2.urlopen(content) 
  42.  
  43.        print "Bye byes!" 
  44.  
  45.   
  46.  
  47. def Producer(): 
  48.  
  49.     urls = [ 
  50.  
  51.          "http://www.python.org', 'http://www.yahoo.com", 
  52.  
  53.         "http://www.scala.org', 'http://www.google.com", 
  54.  
  55.     # etc.. 
  56.  
  57.     ] 
  58.  
  59.     queue = Queue.Queue() 
  60.  
  61.     worker_threads = build_worker_pool(queue, 4) 
  62.  
  63.     start_time = time.time() 
  64.  
  65.   
  66.  
  67.     # Add the urls to process 
  68.  
  69.     for url in urls: 
  70.  
  71.         queue.put(url)   
  72.  
  73.     # Add the poison pillv 
  74.  
  75.     for worker in worker_threads: 
  76.  
  77.         queue.put("quit") 
  78.  
  79.     for worker in worker_threads: 
  80.  
  81.         worker.join() 
  82.  
  83.   
  84.  
  85.     print "Done! Time taken: {}".format(time.time() - start_time) 
  86.  
  87.   
  88.  
  89. def build_worker_pool(queue, size): 
  90.  
  91.     workers = [] 
  92.  
  93.     for _ in range(size): 
  94.  
  95.         worker = Consumer(queue) 
  96.  
  97.         worker.start() 
  98.  
  99.         workers.append(worker) 
  100.  
  101.     return workers 
  102.  
  103.   
  104.  
  105. if __name__ == '__main__': 
  106.  
  107.     Producer()  

它的确能运行,但是这些代码多么复杂阿!它包括了初始化方法、线程跟踪列表以及和我一样容易在死锁问题上出错的人的噩梦——大量的join语句。而这些还仅仅只是繁琐的开始!

我们目前为止都完成了什么?基本上什么都没有。上面的代码几乎一直都只是在进行传递。这是很基础的方法,很容易出错(该死,我刚才忘了在队列对象上还需要调用task_done()方法(但是我懒得修改了)),性价比很低。还好,我们还有更好的方法。

介绍:Map

Map是一个很棒的小功能,同时它也是Python并行代码快速运行的关键。给不熟悉的人讲解一下吧,map是从函数语言Lisp来的。map函数能够按序映射出另一个函数。例如


  1. urls = ['http://www.yahoo.com', 'http://www.reddit.com'] 
  2.  
  3. results = map(urllib2.urlopen, urls)  

这里调用urlopen方法来把调用结果全部按序返回并存储到一个列表里。就像:


  1. results = [] 
  2.  
  3. for url in urls: 
  4.  
  5. results.append(urllib2.urlopen(url))  

Map按序处理这些迭代。调用这个函数,它就会返回给我们一个按序存储着结果的简易列表。

为什么它这么厉害呢?因为只要有了合适的库,map能使并行运行得十分流畅!

有两个能够支持通过map函数来完成并行的库:一个是multiprocessing,另一个是鲜为人知但功能强大的子文件:multiprocessing.dummy。

题外话:这个是什么?你从来没听说过dummy多进程库?我也是最近才知道的。它在多进程的说明文档里面仅仅只被提到了一句。而且那一句就是大概让你知道有这么个东西的存在。我敢说,这样几近抛售的做法造成的后果是不堪设想的!

Dummy就是多进程模块的克隆文件。唯一不同的是,多进程模块使用的是进程,而dummy则使用线程(当然,它有所有Python常见的限制)。也就是说,数据由一个传递给另一个。这能够使得数据轻松的在这两个之间进行前进和回跃,特别是对于探索性程序来说十分有用,因为你不用确定框架调用到底是IO
还是CPU模式。

准备开始

要做到通过map函数来完成并行,你应该先导入装有它们的模块:


  1. from multiprocessing import Pool 
  2.  
  3. from multiprocessing.dummy import Pool as ThreadPool  

再初始化:


  1. pool = ThreadPool() 

这简单的一句就能代替我们的build_worker_pool 函数在example2.py中的所有工作。换句话说,它创建了许多有效的worker,启动它们来为接下来的工作做准备,以及把它们存储在不同的位置,方便使用。

Pool对象需要一些参数,但最重要的是:进程。它决定pool中的worker数量。如果你不填的话,它就会默认为你电脑的内核数值。

如果你在CPU模式下使用多进程pool,通常内核数越大速度就越快(还有很多其它因素)。但是,当进行线程或者处理网络绑定之类的工作时,情况会比较复杂所以应该使用pool的准确大小。


  1. pool = ThreadPool(4) # Sets the pool size to 4 

如果你运行过多线程,多线程间的切换将会浪费许多时间,所以你最好耐心调试出最适合的任务数。

我们现在已经创建了pool对象,马上就能有简单的并行程序了,所以让我们重新写example2.py中的url opener吧!


  1. import urllib2 
  2.  
  3. from multiprocessing.dummy import Pool as ThreadPool 
  4.  
  5.   
  6.  
  7. urls = [ 
  8.  
  9. 'http://www.python.org', 
  10.  
  11. 'http://www.python.org/about/', 
  12.  
  13. 'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html', 
  14.  
  15. 'http://www.python.org/doc/', 
  16.  
  17. 'http://www.python.org/download/', 
  18.  
  19. 'http://www.python.org/getit/', 
  20.  
  21. 'http://www.python.org/community/', 
  22.  
  23. 'https://wiki.python.org/moin/', 
  24.  
  25. 'http://planet.python.org/', 
  26.  
  27. 'https://wiki.python.org/moin/LocalUserGroups', 
  28.  
  29. 'http://www.python.org/psf/', 
  30.  
  31. 'http://docs.python.org/devguide/', 
  32.  
  33. 'http://www.python.org/community/awards/' 
  34.  
  35. # etc.. 
  36.  
  37.  
  38.   
  39.  
  40. # Make the Pool of workers 
  41.  
  42. pool = ThreadPool(4) 
  43.  
  44. # Open the urls in their own threads 
  45.  
  46. # and return the results 
  47.  
  48. results = pool.map(urllib2.urlopen, urls) 
  49.  
  50. #close the pool and wait for the work to finish 
  51.  
  52. pool.close() 
  53.  
  54. pool.join()  

看吧!这次的代码仅用了4行就完成了所有的工作。其中3句还是简单的固定写法。调用map就能完成我们前面例子中40行的内容!为了更形象地表明两种方法的差异,我还分别给它们运行的时间计时。

结果:

相当出色!并且也表明了为什么要细心调试pool的大小。在这里,只要大于9,就能使其运行速度加快。

实例2:

生成成千上万的缩略图

我们在CPU模式下来完成吧!我工作中就经常需要处理大量的图像文件夹。其任务之一就是创建缩略图。这在并行任务中已经有很成熟的方法了。

基础的单线程创建


  1. import os 
  2.  
  3. import PIL 
  4.  
  5.   
  6.  
  7. from multiprocessing import Pool 
  8.  
  9. from PIL import Image 
  10.  
  11.   
  12.  
  13. SIZE = (75,75) 
  14.  
  15. SAVE_DIRECTORY = 'thumbs' 
  16.  
  17.   
  18.  
  19. def get_image_paths(folder): 
  20.  
  21. return (os.path.join(folder, f) 
  22.  
  23. for f in os.listdir(folder) 
  24.  
  25. if 'jpeg' in f) 
  26.  
  27.   
  28.  
  29. def create_thumbnail(filename): 
  30.  
  31. im = Image.open(filename) 
  32.  
  33. im.thumbnail(SIZE, Image.ANTIALIAS) 
  34.  
  35. base, fname = os.path.split(filename) 
  36.  
  37. save_path = os.path.join(base, SAVE_DIRECTORY, fname) 
  38.  
  39. im.save(save_path) 
  40.  
  41.   
  42.  
  43. if __name__ == '__main__': 
  44.  
  45. folder = os.path.abspath( 
  46.  
  47. '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840') 
  48.  
  49. os.mkdir(os.path.join(folder, SAVE_DIRECTORY)) 
  50.  
  51.   
  52.  
  53. images = get_image_paths(folder) 
  54.  
  55.   
  56.  
  57. for image in images: 
  58.  
  59.              create_thumbnail(Image)  

对于一个例子来说,这是有点难,但本质上,这就是向程序传递一个文件夹,然后将其中的所有图片抓取出来,并最终在它们各自的目录下创建和储存缩略图。

我的电脑处理大约6000张图片用了27.9秒。

如果我们用并行调用map来代替for循环的话:


  1. import os 
  2.  
  3. import PIL 
  4.  
  5.   
  6.  
  7. from multiprocessing import Pool 
  8.  
  9. from PIL import Image 
  10.  
  11.   
  12.  
  13. SIZE = (75,75) 
  14.  
  15. SAVE_DIRECTORY = 'thumbs' 
  16.  
  17.   
  18.  
  19. def get_image_paths(folder): 
  20.  
  21. return (os.path.join(folder, f) 
  22.  
  23. for f in os.listdir(folder) 
  24.  
  25. if 'jpeg' in f) 
  26.  
  27.   
  28.  
  29. def create_thumbnail(filename): 
  30.  
  31. im = Image.open(filename) 
  32.  
  33. im.thumbnail(SIZE, Image.ANTIALIAS) 
  34.  
  35. base, fname = os.path.split(filename) 
  36.  
  37. save_path = os.path.join(base, SAVE_DIRECTORY, fname) 
  38.  
  39. im.save(save_path) 
  40.  
  41.   
  42.  
  43. if __name__ == '__main__': 
  44.  
  45. folder = os.path.abspath( 
  46.  
  47. '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840') 
  48.  
  49. os.mkdir(os.path.join(folder, SAVE_DIRECTORY)) 
  50.  
  51.   
  52.  
  53. images = get_image_paths(folder) 
  54.  
  55.   
  56.  
  57. pool = Pool() 
  58.  
  59.         pool.map(create_thumbnail,images) 
  60.  
  61.         pool.close() 
  62.  
  63.         pool.join()  

5.6秒!

对于只改变了几行代码而言,这是大大地提升了运行速度。这个方法还能更快,只要你将cpu 和
io的任务分别用它们的进程和线程来运行——但也常造成死锁。总之,综合考虑到
map这个实用的功能,以及人为线程管理的缺失,我觉得这是一个美观,可靠还容易debug的方法。

好了,文章结束了。一行完成并行任务。

作者:佚名

来源:51CTO

时间: 2024-11-02 22:07:35

Python一行代码完成并行任务的相关文章

调试Python程序代码的几种方法总结

  这篇文章主要介绍了调试Python程序代码的几种方法总结,文中代码基于Python2.x版本,需要的朋友可以参考下 程序能一次写完并正常运行的概率很小,基本不超过1%.总会有各种各样的bug需要修正.有的bug很简单,看看错误信息就知道,有的bug很复杂,我们需要知道出错时,哪些变量的值是正确的,哪些变量的值是错误的,因此,需要一整套调试程序的手段来修复bug. 第一种方法简单直接粗暴有效,就是用print把可能有问题的变量打印出来看看: ? 1 2 3 4 5 6 7 8 9 10 # e

只需一行代码,轻松实现一个在线编辑器_javascript技巧

在大部分人眼里,技术宅给人的印象是沉默寡言,总摸不透他心里想些什么,彼此都保持距离.作为半个程序员,我觉得真正的技术宅大部分时间都在找乐子,鼓捣各种想法,和大部分人的极客心理是一样的,程序员也还爱讲笑话,也喜欢烧菜做饭,虽然大多是为了减减压,这样看来和常人没什么不一样. 不一样的地方,技术宅崇尚极致,喜欢极简,又希望简约不简单,背后就是技术宅满心思的不断的尝试,我正在看着一出好戏在上演: "程序员 Jose Jesus Perez Aguinaga 在 CoderWall 分享了一个小技巧:在浏

数据挖掘之Apriori算法详解和Python实现代码分享_python

关联规则挖掘(Association rule mining)是数据挖掘中最活跃的研究方法之一,可以用来发现事情之间的联系,最早是为了发现超市交易数据库中不同的商品之间的关系.(啤酒与尿布) 基本概念 1.支持度的定义:support(X-->Y) = |X交Y|/N=集合X与集合Y中的项在一条记录中同时出现的次数/数据记录的个数.例如:support({啤酒}-->{尿布}) = 啤酒和尿布同时出现的次数/数据记录数 = 3/5=60%. 2.自信度的定义:confidence(X-->

用一行代码解决CSS各种IE各种兼容问题

用一行代码来解决CSS在,IE6,IE7,IE8,IE9,IE10 中的各种兼容性问题. 在网站前端写代码的过程中,很多时间IE各个版本的兼容问题很难整.现在百度与谷歌都有了一行解决这种兼容性的代码了.如下面的. 办法一 百度也应用了这种方案去解决IE的兼容问题 百度源代码如下 1 <!Doctype html> 2 <htmlxmlns=http://www.w3.org/1999/xhtmlxmlns:bd=http://www.baidu.com/2010/xbdml> 3 

DELPHI DATASNAP 2010入门操作(2)不写一行代码,绿色三层我也行

没有一行代码的三层,功能肯定非常的简单,但是,再简单,我们也三层了,学习一 个东西,需要从入门开始就有兴趣,如果入门就给吓怕了,哪么后来何来信心学习呢? 现在就让我们开始吧,不只是没有华丽的词语,而是连用词都不专业,因为,我也只是 一个入门者. 我所用的版本为:Embarcadero Delphi 2010 Version 14.0.3593.25826 一.让我们 file->new->other ,再选 datasnap server ,或者F6,输入 datasnap

python实用代码片段收集贴

  这篇文章主要介绍了python实用代码片段收集贴,本文收集了如获取一个类的所有子类.计算运行时间.SQLAlchemy简单使用.实现类似Java或C中的枚举等实用功能代码,需要的朋友可以参考下 获取一个类的所有子类 复制代码 代码如下: def itersubclasses(cls, _seen=None): """Generator over all subclasses of a given class in depth first order.""

Ruby一行代码实现的快速排序

  这篇文章主要介绍了Ruby一行代码实现的快速排序,本文直接给出实现代码,超级简洁的一个的方法,需要的朋友可以参考下 代码如下: def quick_sort(a) return a if a.size < 2 (x = a.pop) ? quick_sort(a.select{|i| i <=x }) + [x] + quick_sort(a.select{|i| i > x}) : [] end array = [72,6,57,88,60,42,83,73,42,48,85] p

php中一行代码获取文件后缀名

 php中一行代码获取文件后缀名的方法要结合很多的函数了,我们这个有点像asp中的函数了,下面来一起看看吧. 实例: 代码如下   $filename = 'D:/wamp/www/sparkphp/rar'; $rs = strtolower(trim(substr(strrchr($filename, "."), 1))); 详解: strrchr()函数查找字符串在另一个字符串中最后一次出现的位置,并返回从该位置到字符串结尾的所有字符: substr()函数是返回字符串的一部分,

完成简单一行代码都不写的网页设计

  这是一篇由 Chris Kellett 为大家带来的教程,只需利用Ai CC 以及Muse CC,就可以在一行代码都不写的情况下,完成简单的网页设计和发布. 不过本文仅仅简单的介绍了大体的操作流程,更多细节还需诸位仔细研究一下. 01. 用AI进行基本布局 工作流程的第一步是使用AI CC,进行基本的布局,并且创建基本的矢量图标.完成后,在图层面板中选择释放到图层(顺序).这样就能将设计导出,以供PS编辑. 02.用PS完成设计 根据AI导出的图层,用PS进行视觉设计. 03. 命名图层 用