【Python】浅谈 multiprocessing

一前言 
   使用python进行并发处理多台机器/多个实例的时候,我们可以使用threading ,但是由于著名的GIL存在,实际上threading 并未提供真正有效的并发处理,要充分利用到多核CPU,我们需要使用多进程。Python提供了非常好用的多进程包--multiprocessing。multiprocessing 可以利用multiprocessing.Process对象来创建一个进程,该Process对象与Threading对象的用法基本相同,具有相同的方法(官方原话:"The multiprocessing package mostly replicates the API of the threading module.") 比如:start(),run(),join()的方法。multiprocessing包中也有Lock/Event/Semaphore/Condition/Pipe/Queue类用于进程之间的通信。话不多说 show me the code!

二使用
2.1 初识异同
下面的程序显示threading和multiprocessing的在使用方面的异同,相近的函数join(),start(),append() 等,并做同一件事情打印自己的进程pid

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. import os
  4. import threading
  5. import multiprocessing
  6. def printer(msg):
  7.     print(msg, os.getpid())
  8. print('Main begin:', os.getpid())
  9. # threading
  10. record = []
  11. for i in range(5):
  12.     thread = threading.Thread(target=printer, args=('threading',))
  13.     thread.start()
  14.     record.append(thread)
  15. for thread in record:
  16.     thread.join()
  17. # multi-process
  18. record = []
  19. for i in range(5):
  20.     process = multiprocessing.Process(target=printer, args=('multiprocessing',))
  21.     process.start()
  22.     record.append(process)
  23. for process in record:
  24.     process.join()
  25. print('Main end:', os.getpid())

输出结果

点击(此处)折叠或打开

  1. Main begin: 9524
  2. threading 9524
  3. threading 9524
  4. threading 9524
  5. threading 9524
  6. threading 9524
  7. multiprocessing 9539
  8. multiprocessing 9540
  9. multiprocessing 9541
  10. multiprocessing 9542
  11. multiprocessing 9543
  12. Main end: 9524

从例子的结果可以看出多线程threading的进程id和主进程(父进程)pid一样 ,同为9524; 多进程打印的pid每个都不一样,for循环中每创建一个process对象都年开一个进程。其他相关的方法基本类似。

2.2 用法
创建进程的类:
Process([group [, target [, name [, args [, kwargs]]]]]),
target表示调用对象,
args表示调用对象的位置参数元组。
kwargs表示调用对象的字典。
name为进程的别名。
group实质上不使用,为None。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程,并自动调用run方法.
属性:authkey、daemon(要通过start()设置,必须设置在方法start之前)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。

2.3 创建单进程
单线程比较简单,创建一个 Process的实例对象就好,传入参数 target 为已经定义好的方法worker以及worker需要的参数

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. """
  4. author: yangyi@youzan.com
  5. time: 2017/7/2 下午6:45
  6. func:
  7. """
  8. import multiprocessing
  9. import datetime, time
  10. def worker(interval):
  11.     print("process start: {0}".format(datetime.datetime.today()));
  12.     time.sleep(interval)
  13.     print("process end: {0}".format(datetime.datetime.today()));
  14. if __name__ == "__main__":
  15.     p = multiprocessing.Process(target=worker, args=(5,))
  16.     p.start()
  17.     p.join()
  18.     print "end!"

2.4 创建多进程

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. """
  4. author: yangyi@youzan.com
  5. time: 2017/7/2 下午7:50
  6. func:
  7. """
  8. import multiprocessing
  9. def worker(num):
  10.     print "worker %d" %num
  11. if __name__ == "__main__":
  12.     print("The number of CPU is:" + str(multiprocessing.cpu_count()))
  13.     proc = []
  14.     for i in xrange(5):
  15.         p = multiprocessing.Process(target=worker, args=(i,))
  16.         proc.append(p)
  17.     for p in proc:
  18.         p.start()
  19.     for p in proc:
  20.         p.join()
  21.     print "end ..."

输出

点击(此处)折叠或打开

  1. The number of CPU is:4
  2. worker 0
  3. worker 1
  4. worker 2
  5. worker 3
  6. worker 4
  7. main process end ...

2.5 线程池
multiprocessing提供进程池的类--Pool,它可以指定程序最大可以调用的进程数量,当有新的请求提交到pool中时,如果进程池还没有满,那么就会创建一个新的进程用来执行该请求;但如果进程池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
构造方法:
Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes  : 使用的工作进程的数量,如果processes是None,默认使用os.cpu_count()返回的数量。
initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
context: 用在制定工作进程启动时的上下文,一般使用multiprocessing.Pool()或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。

实例方法:
  apply(func[, args[, kwds]]):同步进程池
  apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :异步进程池
  close() : 关闭进程池,阻止更多的任务提交到pool,待任务完成后,工作进程会退出。
  terminate() : 结束工作进程,不在处理未完成的任务.
  join() : 等待工作线程的退出,在调用join()前必须调用close()或者 terminate(),因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. """
  4. author: yangyi@youzan.com
  5. time: 2017/7/2 下午7:50
  6. func:
  7. """
  8. from multiprocessing import Pool
  9. import time
  10. def worker(num):
  11.     print "worker %d" %num
  12.     time.sleep(2)
  13.     print "end worker %d" %num
  14. if __name__ == "__main__":
  15.     proc_pool = Pool(2)
  16.     for i in xrange(4):
  17.         proc_pool.apply_async(worker, (i,)) #使用了异步调用,从输出结果可以看出来
  18.     proc_pool.close()
  19.     proc_pool.join()
  20.     print "main process end ..."

输出结果

点击(此处)折叠或打开

  1. worker 0
  2. worker 1
  3. end worker 0
  4. end worker 1
  5. worker 2
  6. worker 3
  7. end worker 2
  8. end worker 3
  9. main process end ..

解释:创建一个进程池pool 对象proc_pool,并设定进程的数量为2,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为2,所以0、1会直接送到进程中执行,当其中的2个任务执行完之后才空出2进程处理对象2和3,所以会出现输出 worker 2 worker 3 出现在end worker 0 end worker 1之后。思考一下如果调用  proc_pool.apply(worker, (i,)) 的输出结果会是什么样的?

2.6 使用queue
multiprocessing提供队列类,可以通过调用multiprocessing.Queue(maxsize) 初始化队列对象,maxsize表示队列里面最多的元素个数。
例子 创建了两个函数入队,出队,出队处理时使用了lock特性,串行化取数据。

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. """
  4. author: yangyi@youzan.com
  5. time: 2017/7/2 下午9:03
  6. func:
  7. """
  8. import time
  9. from multiprocessing import Process, current_process,Lock,Queue
  10. import datetime
  11. def inputQ(queue):
  12.     time.sleep(1)
  13.     info = "proc_name: " + current_process().name + ' was putted in queue at: ' + str(datetime.datetime.today())
  14.     queue.put(info)
  15. def outputQ(queue,lock):
  16.     info = queue.get()
  17.     lock.acquire()
  18.     print ("proc_name: " + current_process().name + ' gets info :' + info)
  19.     lock.release()
  20. if __name__ == '__main__':
  21.     record1 = [] # store input processes
  22.     record2 = [] # store output processes
  23.     lock = Lock() # To prevent messy print
  24.     queue = Queue(3)
  25.     for i in range(10):
  26.         process = Process(target=inputQ, args=(queue,))
  27.         process.start()
  28.         record1.append(process)
  29.     for i in range(10):
  30.         process = Process(target=outputQ, args=(queue,lock))
  31.         process.start()
  32.         record2.append(process)
  33.     for p in record1:
  34.         p.join()
  35.     queue.close() # No more object will come, close the queue
  36.     for p in record2:
  37.         p.join()

2.7 使用pipe 
Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。
用法 multiprocessing.Pipe([duplex])
该类返回一组对象实例(conn1, conn2),分别代表发送和接受消息的两端。

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. """
  4. author: yangyi@youzan.com
  5. time: 2017/7/2 下午8:01
  6. func:
  7. """
  8. from multiprocessing import Process, Pipe
  9. def p1(conn, name):
  10.     conn.send('hello ,{name}'.format(name=name))
  11.     print "p1 receive :", conn.recv()
  12.     conn.close()
  13. def p2(conn, name):
  14.     conn.send('hello ,{name}'.format(name=name))
  15.     print "p2 receive :", conn.recv()
  16.     conn.close()
  17. if __name__ == '__main__':
  18.     parent_conn, child_conn = Pipe()
  19.     proc1 = Process(target=p1, args=(child_conn, "parent_conn"))
  20.     proc2 = Process(target=p2, args=(parent_conn, "child_conn"))
  21.     proc1.start()
  22.     proc2.start()
  23.     proc1.join()
  24.     proc2.join()

输出:

点击(此处)折叠或打开

  1. p1 receive : hello ,child_conn
  2. p2 receive : hello ,parent_conn

该例子中 p1 p2 通过pipe 给彼此相互发送信息,p1 发送"parent_conn" 给 p2 ,p2 发送"child_conn" 给p1.
2.8 daemon程序对比结果

  1. import multiprocessing
  2. import datetime, time
  3. def worker(interval):
  4.     print("process start: {0}".format(datetime.datetime.today()));
  5.     time.sleep(interval)
  6.     print("process end: {0}".format(datetime.datetime.today()));
  7. if __name__ == "__main__":
  8.     p = multiprocessing.Process(target=worker, args=(5,))
  9.     p.start()
  10.     print "end!"

输出:

点击(此处)折叠或打开

  1. end!
  2. process start: 2017-07-02 18:47:30.656244
  3. process end: 2017-07-02 18:47:35.657464

设置 daemon = True,程序随着主程序结束而不等待子进程。

  1. import multiprocessing
  2. import datetime, time
  3. def worker(interval):
  4.     print("process start: {0}".format(datetime.datetime.today()));
  5.     time.sleep(interval)
  6.     print("process end: {0}".format(datetime.datetime.today()));
  7. if __name__ == "__main__":
  8.     p = multiprocessing.Process(target=worker, args=(5,))
  9.     p.daemon = True
  10.     p.start()
  11.     print "end!"

输出:
end!
因为子进程设置了daemon属性,主进程结束,multiprocessing创建的进程对象就随着结束了。

  1. import multiprocessing
  2. import datetime, time
  3. def worker(interval):
  4.     print("process start: {0}".format(datetime.datetime.today()));
  5.     time.sleep(interval)
  6.     print("process end: {0}".format(datetime.datetime.today()));
  7. if __name__ == "__main__":
  8.     p = multiprocessing.Process(target=worker, args=(5,))
  9.     p.daemon = True  #
  10.     p.start()
  11.     p.join() #进程执行完毕后再关闭
  12.     print "end!"

输出:

点击(此处)折叠或打开

  1. process start: 2017-07-02 18:48:20.953754
  2. process end: 2017-07-02 18:48:25.954736

2.9 Lock()
当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。
实例方法:
acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。
release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。
例子:
多个进程使用同一个std_out ,使用lock机制确保同一个时刻有一个一个进程获取输出。

  1. #!/usr/bin/env python
    # encoding: utf-8
    """
    author: yangyi@youzan.com
    time: 2017/7/2 下午9:28
    func: 
    """
    from multiprocessing import Process, Lock
    def func_with_lock(l, i):
        l.acquire()
        print 'hello world', i
        l.release()

    def func_without_lock(i):
        print 'hello world', i

    if __name__ == '__main__':
        lock = Lock()
        print "func_with_lock :"
        for num in range(10):
            Process(target=func_with_lock, args=(lock, num)).start()

输出:

点击(此处)折叠或打开

  1. func_with_lock :
  2. hello world 0
  3. hello world 1
  4. hello world 2
  5. hello world 3
  6. hello world 4
  7. hello world 5
  8. hello world 6
  9. hello world 7
  10. hello world 8
  11. hello world 9

三 小结
 本文参考官方资料以及其他资源,对multiprocesssing 的使用方式做了总结,还有很多知识需要详细阅读官方文档。纸上来得终觉浅,绝知此事要躬行。参考资料
[1]官方文档 
[2]Python标准库10 多进程初步 (multiprocessing包)

时间: 2024-10-03 06:51:48

【Python】浅谈 multiprocessing的相关文章

浅谈Python中copy()方法的使用

  这篇文章主要介绍了浅谈Python中copy()方法的使用,Python中的拷贝分为潜拷贝和深拷贝,本文只是简单介绍用法,需要的朋友可以参考下 copy()方法返回字典的浅拷贝. 语法 以下是copy()方法的语法: ? 1 dict.copy() 参数 NA 返回值 此方法返回字典的浅拷贝. 例子 下面的例子显示了copy()方法的使用. ? 1 2 3 4 5 6 #!/usr/bin/python   dict1 = {'Name': 'Zara', 'Age': 7};   dict

浅谈Python Web的五大框架

说到Web Framework,Ruby的世界Rails一统江湖,而Python则是一个百花齐放的世界,各种micro-framework.framework不可胜数,不完全列表见: http://wiki.python.org/moin/WebFrameworks. 虽然另一大脚本语言PHP也有不少框架,但远没有Python这么夸张,也正是因为Python Web Framework(Python Web开发框架,以下简称Python框架)太多,所以在Python社区总有关于Python框架孰

浅谈python字符串方法的简单使用_python

学习python字符串方法的使用,对书中列举的每种方法都做一个试用,将结果记录,方便以后查询. (1) s.capitalize() ;功能:返回字符串的的副本,并将首字母大写.使用如下: >>> s = 'wwwwww' >>> scap = s.capitalize() >>> scap 'Wwwwww' (2)s.center(width,char); 功能:返回将s字符串放在中间的一个长度为width的字符串,默认其他部分用空格填充,否则使用c

浅谈Python 对象内存占用_python

一切皆是对象 在 Python 一切皆是对象,包括所有类型的常量与变量,整型,布尔型,甚至函数. 参见stackoverflow上的一个问题 Is everything an object in python like ruby 代码中即可以验证: # everythin in python is object def fuction(): return print isinstance(True, object) print isinstance(0, object) print isinst

浅谈python类属性的访问、设置和删除方法_python

类属性和对象属性 我们把定义在类中的属性称为类属性,该类的所有对象共享类属性,类属性具有继承性,可以为类动态地添加类属性. 对象在创建完成后还可以为它添加额外的属性,我们把这部分属性称为对象属性,对象属性仅属于该对象,不具有继承性. 类属性和对象属性都会被包含在dir()中,而vars()是仅包含对象属性.vars()跟__dict__是等同的. 类属性和对象属性可类比于Java中的static成员和非static成员,只不python中的类属性和对象属性都是可以动态添加(和删除)的. clas

浅谈Python程序与C++程序的联合使用_python

作为Python程序员,应该能够正视Python的优点与缺点.众所周之,Python的运行速度是很慢的,特别是大数据量的运算时,Python会慢得让人难以忍受.对于这种情况,"专业"的解决方案是用上numpy或者opencl.不过有时候为了一点小功能用上这种重型的解决方案很不划算,或者有时候想要实现的操作在numpy里面没有,需要我们自己用C语言来编写.总之,我们使用Python与C++的混合编程能够加快程序热点的运算速度. 首先要提醒大家注意的是,在考虑联合编程之前一定要找到程序运行

浅谈Python类里的__init__方法函数,Python类的构造函数_python

如果某类里没有__init__方法函数,通过类名字创建的实例对象为空,切没有初始化:如果有此方法函数,通常作为类的第一个方法函数,有点像C++等语言里的构造函数. class Ca: def __init__(self, v): # 注意前后各两个下划线 self.name = v def pr(self): print "a--->", self.name ia = Ca("Jeapedu") # 本质调用的是__init__方法函数 ia.pr() Ca.

浅谈Python爬取网页的编码处理_python

背景 中秋的时候一个朋友给我发了一封邮件说他在爬链家的时候发现网页返回的代码都是乱码让我帮他参谋参谋(中秋加班真是敬业= =)其实这个问题我很早就遇到过之前在爬小说的时候稍微看了一下不过没当回事其实这个问题就是对编码的理解不到位导致的. 问题 很普通的一个爬虫代码代码是这样的 # ecoding=utf-8 import re import requests import sys reload(sys) sys.setdefaultencoding('utf8') url = 'http://j

浅谈 LLDB 调试器 - 如果这叫浅谈,那深得到啥样了呢!

浅谈 LLDB 调试器 2015-01-26 11:34 编辑: z_zombie 分类:iOS开发 来源:南峰子的技术博客 随着Xcode 5的发布,LLDB调试器已经取代了GDB,成为了Xcode工程中默认的调试器.它与LLVM编译器一起,带给我们更丰富的流程控制和数据检测的调试功能.LLDB为Xcode提供了底层调试环境,其中包括内嵌在Xcode IDE中的位于调试区域的控制面板,在这里我们可以直接调用LLDB命令.如图1所示: 图1:位于Xcode调试区域的控制台 在本文中,我们主要整理