python进程池详细剖析教程

python中两个常用来处理进程的模块分别是subprocess和multiprocessing,其中subprocess通常用于执行外部程序,比如一些第三方应用程序,而不是Python程序。如果需要实现调用外部程序的功能,python的psutil模块是更好的选择,它不仅支持subprocess提供的功能,而且还能对当前主机或者启动的外部程序进行监控,比如获取网络、cpu、内存等信息使用情况,在做一些自动化运维工作时支持的更加全面。multiprocessing是python的多进程模块,主要通过启动python进程,调用target回调函数来处理任务,与之对应的是python的多线程模块threading,它们拥有类似的接口,通过定义multiprocessing.Process、threading.Thread,指定target方法,调用start()运行进程或者线程。

  在python中由于全局解释锁(GIL)的存在,使用多线程,并不能大大提高程序的运行效率【1】。因此,用python处理并发问题时,尽量使用多进程而非多线程。并发编程中,最简单的模式是,主进程等待任务,当有新任务到来时,启动一个新的进程来处理当前任务。这种每个任务一个进程的处理方式,每处理一个任务都会伴随着一个进程的创建、运行、销毁,如果进程的运行时间越短,创建和销毁的时间所占的比重就越大,显然,我们应该尽量避免创建和销毁进程本身的额外开销,提高进程的运行效率。我们可以用进程池来减少进程的创建和开销,提高进程对象的复用。

  实际上,python中已经实现了一个功能强大的进程池(multiprocessing.Pool),这里我们来简单剖析下python自带的进程池是如何实现的。

  要创建进程池对象,需要调用Pool函数,函数的声明如下:

Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)
    Returns a process pool object
processes表示工作进程的个数,默认为None,表示worker进程数为cpu_count()
initializer表示工作进程start时调用的初始化函数,initargs表示initializer函数的参数,如果initializer不为None,在每个工作进程start之前会调用initializer(*initargs)
maxtaskperchild表示每个工作进程在退出/被其他新的进程替代前,需要完成的工作任务数,默认为None,表示工作进程存活时间与pool相同,即不会自动退出/被替换。
函数返回一个进程池(Pool)对象

  Pool函数返回的进程池对象中有下面一些数据结构:

self._inqueue  接收任务队列(SimpleQueue),用于主进程将任务发送给worker进程
self._outqueue  发送结果队列(SimpleQueue),用于worker进程将结果发送给主进程
self._taskqueue  同步的任务队列,保存线程池分配给主进程的任务
self._cache = {}  任务缓存
self._processes  worker进程个数
self._pool = []  woker进程队列

  进程池工作时,任务的接收、分配。结果的返回,均由进程池内部的各个线程合作完成,来看看进程池内部由那些线程:

    _work_handler线程,负责保证进程池中的worker进程在有退出的情况下,创建出新的worker进程,并添加到进程队列(pools)中,保持进程池中的worker进程数始终为processes个。_worker_handler线程回调函数为Pool._handler_workers方法,在进程池state==RUN时,循环调用_maintain_pool方法,监控是否有进程退出,并创建新的进程,append到进程池pools中,保持进程池中的worker进程数始终为processes个。
    

    self._worker_handler = threading.Thread(
                target=Pool._handle_workers,
                args=(self, )
    )
    Pool._handle_workers方法在_worker_handler线程状态为运行时(status==RUN),循环调用_maintain_pool方法:
    def _maintain_pool(self):
        if self._join_exited_workers():
            self._repopulate_pool()
    _join_exited_workers()监控pools队列中的进程是否有结束的,有则等待其结束,并从pools中删除,当有进程结束时,调用_repopulate_pool(),创建新的进程:
    w = self.Process(target=worker,
                    args=(self._inqueue, self._outqueue,
                          self._initializer, self._initargs,                 
                           self._maxtasksperchild)
                     )
    self._pool.append(w)

    w是新创建的进程,它是用来处理实际任务的进程,worker是它的回调函数:

    def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
        assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
        put = outqueue.put
        get = inqueue.get
        if hasattr(inqueue, '_writer'):
            inqueue._writer.close()
            outqueue._reader.close()
        if initializer is not None:
            initializer(*initargs)
        completed = 0
        while maxtasks is None or (maxtasks and completed < maxtasks):
            try:
                task = get()
            except (EOFError, IOError):
                debug('worker got EOFError or IOError -- exiting')
                break
            if task is None:
                debug('worker got sentinel -- exiting')
                break
            job, i, func, args, kwds = task
            try:
                result = (True, func(*args, **kwds))
            except Exception, e:
                result = (False, e)
            try:
                put((job, i, result))
            except Exception as e:
                wrapped = MaybeEncodingError(e, result[1])
                debug("Possible encoding error while sending result: %s" % (
                    wrapped))
                put((job, i, (False, wrapped)))
            completed += 1
        debug('worker exiting after %d tasks' % completed)

    所有worker进程都使用worker回调函数对任务进行统一的处理,从源码中可以看出:
    它的功能是从接入任务队列中(inqueue)读取出task任务,然后根据任务的函数、参数进行调用(result = (True, func(*args, **kwds),
    再将结果放入结果队列中(outqueue),如果有最大处理上限的限制maxtasks,那么当进程处理到任务数上限时退出。

    
    _task_handler线程,负责从进程池中的task_queue中,将任务取出,放入接收任务队列(Pipe),

    self._task_handler = threading.Thread(
                target=Pool._handle_tasks,
                args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
    )
    Pool._handle_tasks方法不断从task_queue中获取任务,并放入接受任务队列(in_queue),以此触发worker进程进行任务处理。当从task_queue读取到None元素时,
    表示进程池将要被终止(terminate),不再处理之后的任务请求,同时向接受任务队列和结果任务队列put None元素,通知其他线程结束。

    _handle_results线程,负责将处理完的任务结果,从outqueue(Pipe)中读取出来,放在任务缓存cache中,

    self._result_handler = threading.Thread(
            target=Pool._handle_results,
            args=(self._outqueue, self._quick_get, self._cache)
    )

    _terminate,这里的_terminate并不是一个线程,而是一个Finalize对象

    

    self._terminate = Finalize(
                self, self._terminate_pool,
                args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
                      self._worker_handler, self._task_handler,
                      self._result_handler, self._cache),
                exitpriority=15
    )
    Finalize类的构造函数与线程构造函数类似,_terminate_pool是它的回调函数,args回调函数的参数。
    _terminate_pool函数负责终止进程池的工作:终止上述的三个线程,终止进程池中的worker进程,清除队列中的数据。
    _terminate是个对象而非线程,那么它如何像线程调用start()方法一样,来执行回调函数_terminate_pool呢?查看Pool源码,发现进程池的终止函数:
    def terminate(self):
        debug('terminating pool')
        self._state = TERMINATE
        self._worker_handler._state = TERMINATE
        self._terminate()
    函数中最后将_terminate对象当做一个方法来执行,而_terminate本身是一个Finalize对象,我们看一下Finalize类的定义,发现它实现了__call__方法:
    def __call__(self, wr=None):
        try:
            del _finalizer_registry[self._key]
        except KeyError:
            sub_debug('finalizer no longer registered')
        else:
            if self._pid != os.getpid():
                res = None
            else:
                res = self._callback(*self._args, **self._kwargs)
            self._weakref = self._callback = self._args = \
                            self._kwargs = self._key = None
            return res
    而方法中 self._callback(*self._args, **self._kwargs) 这条语句,就执行了_terminate_pool函数,进而将进程池终止。

   
  进程池中的数据结构、各个线程之间的合作关系如下图所示:

  【1】这里针对的是CPU密集型程序,多线程并不能带来效率上的提升,相反还可能会因为线程的频繁切换,导致效率下降;如果是IO密集型,多线程进程可以利用IO阻塞等待时的空闲时间执行其他线程,提升效率。

我们知道,当进程池中任务队列非空时,才会触发worker进程去工作,那么如何向进程池中的任务队列中添加任务呢,进程池类有两组关键方法来创建任务,分别是apply/apply_async和map/map_async,实际上进程池类的apply和map方法与python内建的两个同名方法类似,apply_async和map_async分别为它们的非阻塞版本。

  首先来看apply_async方法,源码如下:

def apply_async(self, func, args=(), kwds={}, callback=None):
    assert self._state == RUN
    result = ApplyResult(self._cache, callback)
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
    return result

func表示执行此任务的方法
args、kwds分别表func的位置参数和关键字参数
callback表示一个单参数的方法,当有结果返回时,callback方法会被调用,参数即为任务执行后的结果

  每调用一次apply_result方法,实际上就向_taskqueue中添加了一条任务,注意这里采用了非阻塞(异步)的调用方式,即apply_async方法中新建的任务只是被添加到任务队列中,还并未执行,不需要等待,直接返回创建的ApplyResult对象,注意在创建ApplyResult对象时,将它放入进程池的缓存_cache中。

  任务队列中有了新创建的任务,那么根据上节分析的处理流程,进程池的_task_handler线程,将任务从taskqueue中获取出来,放入_inqueue中,触发worker进程根据args和kwds调用func,运行结束后,将结果放入_outqueue,再由进程池中的_handle_results线程,将运行结果从_outqueue中取出,并找到_cache缓存中的ApplyResult对象,_set其运行结果,等待调用端获取。

  apply_async方法既然是异步的,那么它如何知道任务结束,并获取结果呢?这里需要了解ApplyResult类中的两个主要方法:

def get(self, timeout=None):
    self.wait(timeout)
    if not self._ready:
        raise TimeoutError
    if self._success:
        return self._value
    else:
        raise self._value
def _set(self, i, obj):
    self._success, self._value = obj
    if self._callback and self._success:
        self._callback(self._value)
    self._cond.acquire()
    try:
        self._ready = True
        self._cond.notify()
    finally:
        self._cond.release()
    del self._cache[self._job]

从这两个方法名可以看出,get方法是提供给客户端获取worker进程运行结果的,而运行的结果是通过_handle_result线程调用_set方法,存放在ApplyResult对象中。
_set方法将运行结果保存在ApplyResult._value中,唤醒阻塞在条件变量上的get方法。客户端通过调用get方法,返回运行结果。

  apply方法是以阻塞的方式运行获取进程结果,它的实现很简单,同样是调用apply_async,只不过不返回ApplyResult,而是直接返回worker进程运行的结果:

def apply(self, func, args=(), kwds={}):
        assert self._state == RUN
        return self.apply_async(func, args, kwds).get()

  以上的apply/apply_async方法,每次只能向进程池分配一个任务,那如果想一次分配多个任务到进程池中,可以使用map/map_async方法。首先来看下map_async方法是如何定义的:

def map_async(self, func, iterable, chunksize=None, callback=None):
    assert self._state == RUN
    if not hasattr(iterable, '__len__'):
        iterable = list(iterable)
    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
        if extra:
            chunksize += 1
        if len(iterable) == 0:
            chunksize = 0
    task_batches = Pool._get_tasks(func, iterable, chunksize)
    result = MapResult(self._cache, chunksize, len(iterable), callback)
    self._taskqueue.put((((result._job, i, mapstar, (x,), {})
                              for i, x in enumerate(task_batches)), None))
    return result

func表示执行此任务的方法
iterable表示任务参数序列
chunksize表示将iterable序列按每组chunksize的大小进行分割,每个分割后的序列提交给进程池中的一个任务进行处理
callback表示一个单参数的方法,当有结果返回时,callback方法会被调用,参数即为任务执行后的结果

   从源码可以看出,map_async要比apply_async复杂,首先它会根据chunksize对任务参数序列进行分组,chunksize表示每组中的任务个数,当默认chunksize=None时,根据任务参数序列和进程池中进程数计算分组数:chunk, extra = divmod(len(iterable), len(self._pool) * 4)。假设进程池中进程数为len(self._pool)=4,任务参数序列iterable=range(123),那么chunk=7, extra=11,向下执行,得出chunksize=8,表示将任务参数序列分为8组。任务实际分组:

task_batches = Pool._get_tasks(func, iterable, chunksize)
def _get_tasks(func, it, size):
    it = iter(it)
    while 1:
        x = tuple(itertools.islice(it, size))
        if not x:
            return
        yield (func, x)

这里使用yield将_get_tasks方法编译成生成器。实际上对于range(123)这样的序列,按照chunksize=8进行分组后,一共16组每组的元素如下:
(func, (0,   1,   2,   3,   4,   5,   6,   7))
(func, (8,   9,   10,  11,  12,  13,  14,  15))
(func, (16,  17,  18,  19,  20,  21,  22,  23))
...
(func, (112, 113, 114, 115, 116, 117, 118, 119))
(func, (120, 121, 122))

  分组之后,这里定义了一个MapResult对象:result = MapResult(self._cache, chunksize, len(iterable), callback)它继承自AppyResult类,同样提供get和_set方法接口。将分组后的任务放入任务队列中,然后就返回刚刚创建的result对象。

self._taskqueue.put((((result._job, i, mapstar, (x,), {})
                              for i, x in enumerate(task_batches)), None))
以任务参数序列=range(123)为例,实际上这里向任务队列中put了一个16组元组元素的集合,元组依次为:
(result._job, 0, mapstar, ((func, (0,   1,   2,   3,   4,   5,   6,   7)),), {}, None)
(result._job, 1, mapstar, ((func, (8,   9,   10,  11,  12,  13,  14,  15)),), {}, None)
……
(result._job, 15, mapstar, ((func, (120, 121, 122)),), {}, None)
注意每一个元组中的 i,它表示当前元组在整个任务元组集合中的位置,通过它,_handle_result线程才能将worker进程运行的结果,以正确的顺序填入到MapResult对象中。

  注意这里只调用了一次put方法,将16组元组作为一个整体序列放入任务队列,那么这个任务是否_task_handler线程是否也会像apply_async方法一样,将整个任务序列传递给_inqueue,这样就会导致进程池中的只有一个worker进程获取到任务序列,而并非起到多进程的处理方式。我们来看下_task_handler线程是怎样处理的:

def _handle_tasks(taskqueue, put, outqueue, pool, cache):
    thread = threading.current_thread()
    for taskseq, set_length in iter(taskqueue.get, None):
        i = -1
        for i, task in enumerate(taskseq):
            if thread._state:
                debug('task handler found thread._state != RUN')
                break
            try:
                put(task)
            except Exception as e:
                job, ind = task[:2]
                try:
                    cache[job]._set(ind, (False, e))
                except KeyError:
                    pass
        else:
            if set_length:
                debug('doing set_length()')
                set_length(i+1)
            continue
        break
    else:
        debug('task handler got sentinel')

  注意到语句 for i, task in enumerate(taskseq),原来_task_handler线程在通过taskqueue获取到任务序列后,并不是直接放入_inqueue中的,而是将序列中任务按照之前分好的组,依次放入_inqueue中的,而循环中的task即上述的每个任务元组:(result._job, 0, mapstar, ((func, (0,   1,   2,   3,   4,   5,   6,   7)),), {}, None)。接着触发worker进程。worker进程获取出每组任务,进行任务的处理:

job, i, func, args, kwds = task 
try:   
    result = (True, func(*args, **kwds))
except Exception, e:
    result = (False, e)
try:
    put((job, i, result))
except Exception as e:
    wrapped = MaybeEncodingError(e, result[1])
    debug("Possible encoding error while sending result: %s" % (
        wrapped))
    put((job, i, (False, wrapped)))

根据之前放入_inqueue的顺序对应关系:
(result._job, 0, mapstar, ((func, (0,   1,   2,   3,   4,   5,   6,   7)),), {}, None)
job, i, func, args, kwds = task
可以看出,元组中 mapstar 表示这里的回调函数func,((func, (0, 1, 2, 3, 4, 5, 6, 7)),)和{}分别表示args和kwds参数。
执行result = (True, func(*args, **kwds))
再来看下mapstar是如何定义的:
def mapstar(args):
return map(*args)
这里mapstar表示回调函数func,它的定义只有一个参数,而在worker进程执行回调时,使用的是func(*args, **kwds)语句,这里多一个参数能够正确执行吗?答案时肯定的,在调用mapstar时,如果kwds为空字典,那么传入第二个参数不会影响函数的调用,而一个无参函数func_with_none_params,在调用时使用func_with_none_params(*(), **{})也是没有问题的,python会自动忽视传入的两个空参数。
看到这里,我们明白了,实际上对任务参数分组后,每一组的任务是通过内建的map方法来进行调用的。
运行之后调用put(job, i, result)将结果放入_outqueue中,_handle_result线程会从_outqueue中将结果取出,并找到_cache缓存中的MapResult对象,_set其运行结果

  现在来我们来总结下,进程池的map_async方法是如何运行的,我们将range(123)这个任务序列,将它传入map_async方法,假设不指定chunksize,并且cpu为四核,那么方法内部会分为16个组(0~14组每组8个元素,最后一组3个元素)。将分组后的任务放入任务队列,一共16组,那么每个进程需要运行4次来处理,每次通过内建的map方法,顺序将组中8个任务执行,再将结果放入_outqueue,找到_cache缓存中的MapResult对象,_set其运行结果,等待客户端获取。使用map_async方法会调用多个worker进程处理任务,每个worler进程运行结束,会将结果传入_outqueue,再有_handle_result线程将结果写入MapResult对象,那如何保证结果序列的顺序与调用map_async时传入的任务参数序列一致呢,我们来看看MapResult的构造函数和_set方法的实现。

def __init__(self, cache, chunksize, length, callback):
    ApplyResult.__init__(self, cache, callback)
    self._success = True
    self._value = [None] * length
    self._chunksize = chunksize
    if chunksize <= 0:
        self._number_left = 0
        self._ready = True
        del cache[self._job]
    else:
        self._number_left = length//chunksize + bool(length % chunksize)
def _set(self, i, success_result):
    success, result = success_result
    if success:
        self._value[i*self._chunksize:(i+1)*self._chunksize] = result
        self._number_left -= 1
        if self._number_left == 0:
            if self._callback:
                self._callback(self._value)
            del self._cache[self._job]
            self._cond.acquire()
            try:
                self._ready = True
                self._cond.notify()
            finally:
                self._cond.release()
    else:
        self._success = False
        self._value = result
        del self._cache[self._job]
        self._cond.acquire()
        try:
            self._ready = True
            self._cond.notify()
        finally:
            self._cond.release()

   MapResult类中,_value保存map_async的运行结果,初始化时为一个元素为None的list,list的长度与任务参数序列的长度相同,_chunksize表示将任务分组后,每组有多少个任务,_number_left表示整个任务序列被分为多少个组。_handle_result线程会通过_set方法将worker进程的运行结果保存到_value中,那么如何将worker进程运行的结果填入到_value中正确的位置呢,还记得在map_async在向task_queue填入任务时,每组中的 i吗,i表示的就是当前任务组的组号,_set方法会根据当前任务的组号即参数 i,并且递减_number_left,当_number_left递减为0时,表示任务参数序列中的所有任务都已被woker进程处理,_value全部被计算出,唤醒阻塞在get方法上的条件变量,是客户端可以获取运行结果。

  map函数为map_async的阻塞版本,它在map_async的基础上,调用get方法,直接阻塞到结果全部返回:

def map(self, func, iterable, chunksize=None):
    assert self._state == RUN
    return self.map_async(func, iterable, chunksize).get()

  这节我们主要分析了两组向进程池分配任务的接口:apply/apply_async和map/map_async。apply方法每次处理一个任务,不同任务的执行方法(回调函数)、参数可以不同,而map方法每次可以处理一个任务序列,每个任务的执行方法相同。

现在让我们回头看看标准库中对进程池的实现都有哪些值得我们学习的地方。我们知道,进程池内部由多个线程互相协作,向客户端提供可靠的服务,那么这些线程之间是怎样做到数据共享与同步的呢?在客户端使用apply/map函数向进程池分配任务时,使用self._taskqueue来存放任务元素,_taskqueue定义为Queue.Queue(),这是一个python标准库中的线程安全的同步队列,它保证通知时刻只有一个线程向队列添加或从队列获取元素。这样,主线程向进程池中分配任务(taskqueue.put),进程池中_handle_tasks线程读取_taskqueue队列中的元素,两个线程同时操作taskqueue,互不影响。进程池中有N个worker进程在等待任务下发,那么进程池中的_handle_tasks线程读取出任务后,又如何保证一个任务不被多个worker进程获取到呢?我们来看下_handle_tasks线程将任务读取出来之后如何交给worker进程的:

for taskseq, set_length in iter(taskqueue.get, None):
    i = -1
    for i, task in enumerate(taskseq):
        if thread._state:
            debug('task handler found thread._state != RUN')
            break
        try:
            put(task)
        except Exception as e:
            job, ind = task[:2]
            try:
                cache[job]._set(ind, (False, e))
            except KeyError:
                pass
    else:
        if set_length:
            debug('doing set_length()')
            set_length(i+1)
        continue
    break
else:
    debug('task handler got sentinel')

在从taskqueue中get到任务之后,对任务中的每个task,调用了put函数,这个put函数实际上是将task放入了管道,而主进程与worker进程的交互,正是通过管道来完成的。
再来看看worker进程的定义:
w = self.Process(target=worker,
                 args=(self._inqueue, self._outqueue,
                         self._initializer,
                   self._initargs, self._maxtasksperchild)
            )
其中self._inqueue和self._outqueue为SimpleQueue()对象,实际是带锁的管道,上述_handle_task线程调用的put函数,即为SimpleQueue对象的方法。我们看到,这里worker进程定义均相同,所以进程池中的worker进程共享self._inqueue和self._outqueue对象,那么当一个task元素被put到共享的_inqueue管道中时,如何确保只有一个worker获取到呢,答案同样是加锁,在SimpleQueue()类的定义中,put以及get方法都带有锁,进行同步,唯一不同的是,这里的锁是用于进程间同步的。这样就保证了多个worker之间能够确保任务的同步。与分配任务类似,在worker进程运行完之后,会将结果put会_outqueue,_outqueue同样是SimpleQueue类对象,可以在多个进程之间进行互斥。

  在worker进程运行结束之后,会将执行结果通过管道传回,进程池中有_handle_result线程来负责接收result,取出之后,通过调用_set方法将结果写回ApplyResult/MapResult对象,客户端可以通过get方法取出结果,这里通过使用条件变量进行同步,当_set函数执行之后,通过条件变量唤醒阻塞在get函数的主进程。

  进程池终止工作通过调用Pool.terminate()来实现,这里的实现很巧妙,用了一个可调用对象,将终止Pool时的需要执行的回调函数先注册好,等到需要终止时,直接调用对象即可。

self._terminate = Finalize(
                self, self._terminate_pool,
                args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
                   self._worker_handler, self._task_handler,
                   self._result_handler, self._cache),
                exitpriority=15
            )
在Finalize类的实现了__call__方法,在运行self._terminate()时,就会调用构造self._terminate时传入的self._terminate_pool对象。

  使用map/map_async函数向进程池中批量分配任务时,使用了生成器表达式:

self._taskqueue.put((((result._job, i, mapstar, (x,), {}) for i, x in enumerate(task_batches)), None))
生成器表达式很简单,只需把列表解析的的[]换成()即可,上述表达的列表解析表示为:
[(result._job, i, mapstar, (x,), {}) for i, x in enumerate(task_batches)]
这里使用生成器表达式的好处是,它相当于列表解析的扩展,是对内存有好的,因为它只是生成了一个生成器,当我们需要使用该生成器对应的逻辑目标数据时,它才会通过既定逻辑去生成该数据,所以不会大量占用内存。

  在Pool中,_worker_handler线程负责监控、创建新的工作进程,在监控工作进程退出时,同时将退出的进程从进程池中删除掉。这类似于,一边遍历一边删除列表。我们来看下下面代码的实现:

>>> l = [1, 2, 3, 3, 4, 4, 4, 5]
>>> for i in l:
    if i in [3, 4, 5]:
        l.remove(i)

        
>>> l
[1, 2, 3, 4, 5]

我们看到l没有将所有的3和4都删除掉,这是因为remove改变了l的大小。再看下面的实现:

>>> l = [1, 2, 3, 3, 4, 4, 4, 5]
>>> for i in range(len(l)):
    if l[i] in [3, 4]:
        del l[i]

        

Traceback (most recent call last):
  File "<pyshell#37>", line 2, in <module>
    if l[i] in [3, 4]:
IndexError: list index out of range
>>>

同样因为del l[i]时,l的大小改变,继续访问下去导致访问越界。而标准库中的进程池给出了遍历删除的一个正确示例:

for i in reversed(range(len(self._pool))):
    worker = self._pool[i]
    if worker.exitcode is not None:
        worker.join()
        cleaned = True
        del self._pool[i]

使用reversed,从后向前删除list中的元素,这样会保证所有符合删除条件的元素被删除掉:

>>> l = [1, 2, 3, 3, 4, 4, 4, 5]
>>> for i in reversed(range(len(l))):
    if l[i] in [3, 4, 5]:
        del l[i]

        
>>> l
[1, 2]

  可以看出,一个篇幅并不算大的Pool模块,就有很多值得学习的地方。对于python亦或者其他语言,技能的提升,多阅读标准库中代码,是一个很不错的选择。对于我们经常使用,而不知其中实现奥秘的源码,多阅读源码,了解其技术实现,就像侯捷那本《STL源码剖析》中讲到的,源码之前,了无秘密。更重要的是,将这些漂亮而又高效的编码方式,运用在自己的工作中,让自己的代码也可以像标准库中的代码一样优雅,这可以说是每一个开发人员的追求。

以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索线程
, 多线程
, 函数
, 程序
, 模块
进程
python源码剖析、python源码剖析 pdf、python源码剖析 高清、python源码剖析 mobi、python源代码剖析,以便于您获取更多的相关知识。

时间: 2024-09-10 04:02:07

python进程池详细剖析教程的相关文章

python 进程池失效原因

问题描述 python 进程池失效原因 from multiprocessing import Pool import os, time, random def process_pool_program_(): def long_time_task(name): print 'Run task %s (%s)...' % (name, os.getpid()) start = time.time() time.sleep(random.random() * 3) end = time.time(

Python自定义进程池实例分析【生产者、消费者模型问题】_python

本文实例分析了Python自定义进程池.分享给大家供大家参考,具体如下: 代码说明一切: #encoding=utf-8 #author: walker #date: 2014-05-21 #function: 自定义进程池遍历目录下文件 from multiprocessing import Process, Queue, Lock import time, os #消费者 class Consumer(Process): def __init__(self, queue, ioLock):

Python并发编程之线程池/进程池

引言 Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码,但是当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候我们就要编写自己的线程池/进程池,以空间换时间.但从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutor和ProcessPoolExecutor两个类,实现了对threading和multiprocessing的进一步抽象,对

使用python玩跳一跳超详细使用教程

在上一篇文章里介绍了使用Python玩微信跳一跳的详细使用教程,不过依旧还是有很多小伙伴有各种各样的问题.为了让各位小伙伴都能使用黑科技,于是再做一个超详细教程.从Python的安装开始,手把手教你一步一步配置好环境直到成功运行. 本文首发于http://www.52aite.cn博客,没有知乎,没有微信公众号,只是蹭一波python跳一跳的热度. python辅助作者github账号为:wangshub. 作者的知乎专栏为:https://zhuanlan.zhihu.com/p/324524

PYTHON多进程编码结束之进程池POOL

结束昨晚开始的测试. 最后一个POOL. A,使用POOL的返回结果 #coding: utf-8 import multiprocessing import time def func(msg): print 'msg:', msg time.sleep(3) print 'end' return 'done', msg if __name__ == '__main__': pool = multiprocessing.Pool(processes=3) result = [] for i i

Lamp环境的详细安装教程

原文:Lamp环境的详细安装教程 架构LAMP环境   1.布置LAMP环境之前的准备工作      在架构LAMP环境时,确保你的Linux系统已经安装了make.gcc.gcc-c++(使用rpm -q xxx 查看系统是否已经安装软件) 解压Lamp压缩包   下载地址:http://pan.baidu.com/s/1hq4hI5m   如果解麻烦的话,可以写一个自动解压脚本 1 cd /lamp #你解压的目录 2 3 ls *.tar.gz > ls.list 4 5 for tar

阿里云服务器ECS(Ubuntu)搭建nginx服务器 详细步骤教程

假设阿里云服务器的ip是192.168.1.10(读者需要换成自己的ip地址),并且没有选择任何安装包.现在ECS是完全空的,除了必须的Linux系统外,其他什么都没有.还有一点,本文选择的是Ubuntu Linux 14.04 64位版本.Linux比较节省内存,因为并没有UI部分,纯控制台操作. 本文只安装nginx服务器,后面的文章会配置其他环节,如PHP.Java.MySQL等. 在安装之前需要完成如下两项工作. 找一个工具上传文件(如nginx安装包) 连接到服务器的Console上,

线程-求救高手。。linux c编程,用进程池实现shell命令cp

问题描述 求救高手..linux c编程,用进程池实现shell命令cp 现在状态是,复制单个文件没问题,复制一个目录且目录里没有子目录也没问题. 如果目录里有一个子目录,这样貌似没有问题,但是如果有多个子目录,就会出现各种问题. 弄了一个晚上了,各种惆怅,求救各位老大帮忙看看. 本人新手,代码比较乱真的非常抱歉. //code start #include #include #include #include #include #include #include #include #inclu

列出C#进程以及详细信息

建立一个listBox将进程名称遍历进去 this.listBox1.Items.Clear(); Process[] MyProcesses=Process.GetProcesses(); foreach(Process MyProcess in MyProcesses) { this.listBox1.Items.Add(MyProcess.ProcessName); } this.listBox1.SelectedIndex=0; 选中listBox里面的项后将进程详细信息显示在右面的La