进程与线程---Python_Daily

  • 进程和线程

  在操作系统看来,一个任务就是一个进程,而一个进程内部如果要做多个任务就是有多个线程。一个进程至少有一个线程。

  真正的并行执行任务是由多个CUP分别执行任务,实际中是由,操作系统轮流让各个任务交替执行,任务1执行0.01秒,任务2执行0.01秒,之后再依次切换。

  Python中支持两种模式:

  多进程模式

  多线程模式

 

  • 多进程

  Linux操作系统下,提供了一个fork()系统调用。调用一次fork(),返回两次,因为操作系统自动把当前的进程(作为父进程)复制了一份(称为子进程),然后子进程返回0,父进程返回子进程的ID。

# multiprocessing.py
import os

print 'Process (%s) start...' % os.getpid()
pid = os.fork()
if pid==0:
    print 'I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())
else:
    print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)

Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

  由于windows下没有fork()调用,提供了multiprocessing模块进行跨平台版本的多进程模块。

用Process类代表创建进程对象,传入一个执行函数和函数的参数。之后再用start()方法启动,jion()方法可以等待子进程结束后再继续往下进行,通常用于进程间的同步。

from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print 'Run child process %s (%s)...' % (name, os.getpid())

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    p = Process(target=run_proc, args=('test',))
    print 'Process will start.'
    p.start()
    p.join()
    print 'Process end.'

Parent process 928.
Process will start.
Run child process test (929)...
Process end.

  Pool进程池创建多个子进程

  对Pool对象创建多个子进程后,用close()方法结束创建,再用join()方法等待所有子进程执行完毕。在每个子进程中会随机休眠一段时间,其他的子进程在这段休眠时间里就会调用。

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print 'Run task %s (%s)...' % (name, os.getpid())
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print 'Task %s runs %0.2f seconds.' % (name, (end - start))

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    p = Pool()
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print 'Waiting for all subprocesses done...'
    p.close()
    p.join()
    print 'All subprocesses done.'

Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.

  进程间通信

  Python的miltiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。

  在父进程中创建两个子进程,一个往Queue中写数据,一个从Queue里读数据。

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    for value in ['A', 'B', 'C']:
        print 'Put %s to queue...' % value
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    while True:
        value = q.get(True)
        print 'Get %s from queue.' % value

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
  • 多线程

  Python中提供两个模块,thread是低级模块,threading是高级模块,对thread进行了封装。

import time, threading

# 新线程执行的代码:
def loop():
    print 'thread %s is running...' % threading.current_thread().name
    n = 0
    while n < 5:
        n = n + 1
        print 'thread %s >>> %s' % (threading.current_thread().name, n)
        time.sleep(1)
    print 'thread %s ended.' % threading.current_thread().name

print 'thread %s is running...' % threading.current_thread().name
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print 'thread %s ended.' % threading.current_thread().name

thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.

  多进程和多线程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改。因此,线程之间共享数据最大的危险在于多个线程同时该变一个变量,把内容给改乱了。  

  因此得加上一把锁lock

balance = 0
lock = threading.Lock()

def run_thread(n):
    for i in range(100000):
        # 先要获取锁:
        lock.acquire()
        try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了一定要释放锁:
            lock.release()

   当多个线程同时执行lock.acquire()时,只有一个线程能成功地获取锁,然后继续执行代码,其他线程就继续等待直到获得锁为止。

  但是,这样实际上就不是并行处理了。

  Python的多进程由于存在GIL锁的问题,所以多线程实际上不能有效利用多核。多线程的并发在Python中是无用的。

  • ThreadLocal

  全局变量local_school就是一个ThreadLoacl对象,每个Thread对它都可以读写student属性,但是互不影响。可以把local_school看成全局变量,但每个属性如local_school.student都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。

import threading

# 创建全局ThreadLocal对象:
local_school = threading.local()

def process_student():
    print 'Hello, %s (in %s)' % (local_school.student, threading.current_thread().name)

def process_thread(name):
    # 绑定ThreadLocal的student:
    local_school.student = name
    process_student()

t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)
  • 进程vs线程

  多进程的优点是稳定性高,一个崩溃,不会影响其他的进程,但是,代价大,在linux下,调用fork还可以,但是windows下进程开销巨大。

  多线程模式比多进程快一点,但是也快不了多少,缺点十分明显,由于共享进程的内存,一个线程崩了,就都崩了。

  计算密集型和IO密集型:

  计算密集型会消耗大量的CPU资源,代码的运行效率就至关重要,Python等脚本语言运行效率低,不适合。

  IO密集型涉及到网络、磁盘IO的任务,它们的CUP消耗较少,任务的主要时间在等待IO操作完成,CUP效率无法完全使用,所以适合开发效率高的语言。

  现代操作系统对IO操作进行了巨大的改进,支持异步IO。利用异步IO,就可以用单进程模型来执行多任务,这种全新的模型称为事件驱动型。

  • 分布式进程

  多台电脑协助工作,一台电脑作为调度者,依靠网络通信,将任务分布到其他电脑的进程中。

  通过manager模块把Queue通过网络暴露出去,让其他机器的进程可以访问Queue

服务器继承中,负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务:

# taskmanager.py

import random, time, Queue
from multiprocessing.managers import BaseManager

# 发送任务的队列:
task_queue = Queue.Queue()
# 接收结果的队列:
result_queue = Queue.Queue()

# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
    pass

# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey='abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
    n = random.randint(0, 10000)
    print('Put task %d...' % n)
    task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
    r = result.get(timeout=10)
    print('Result: %s' % r)
# 关闭:
manager.shutdown()

在另一台机器上启动任务进程:

# taskworker.py

import time, sys, Queue
from multiprocessing.managers import BaseManager

# 创建类似的QueueManager:
class QueueManager(BaseManager):
    pass

# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器,也就是运行taskmanager.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与taskmanager.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey='abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty.')
# 处理结束:
print('worker exit.')

服务进程启动如下:

$ python taskmanager.py
Put task 3411...
Put task 1605...
Put task 1398...
Put task 4729...
Put task 5300...
Put task 7471...
Put task 68...
Put task 4219...
Put task 339...
Put task 7866...
Try get results...

工作进程启动如下:

$ python taskworker.py 127.0.0.1
Connect to server 127.0.0.1...
run task 3411 * 3411...
run task 1605 * 1605...
run task 1398 * 1398...
run task 4729 * 4729...
run task 5300 * 5300...
run task 7471 * 7471...
run task 68 * 68...
run task 4219 * 4219...
run task 339 * 339...
run task 7866 * 7866...
worker exit.

等到工作进程结束后,服务进程如下:

Result: 3411 * 3411 = 11634921
Result: 1605 * 1605 = 2576025
Result: 1398 * 1398 = 1954404
Result: 4729 * 4729 = 22363441
Result: 5300 * 5300 = 28090000
Result: 7471 * 7471 = 55815841
Result: 68 * 68 = 4624
Result: 4219 * 4219 = 17799961
Result: 339 * 339 = 114921
Result: 7866 * 7866 = 61873956

  注意Queue的作用是来传递任务和接受结果的,每个任务的描述量要尽量小。比如发送一个处理日志文件的任务,不要发送几百兆的日志文件本身,而是发送日志文件存放的完整路径,由Worker进程再去共享的磁盘上读取文件。

时间: 2025-01-20 14:43:29

进程与线程---Python_Daily的相关文章

四种进程或线程同步互斥的控制方法

进程|控制 很想整理一下自己对进程线程同步互斥的理解.正巧周六一个刚刚回到学校的同学请客吃饭.在吃饭的过程中,有两个同学,为了一个问题争论的面红耳赤.一个认为.Net下的进程线程控制模型更加合理.一个认为Java下的线程池策略比.Net的好.大家的话题一下转到了进程线程同步互斥的控制问题上.回到家,想了想就写了这个东东.  现在流行的进程线程同步互斥的控制机制,其实是由最原始最基本的4种方法实现的.由这4种方法组合优化就有了.Net和Java下灵活多变的,编程简便的线程进程控制手段.  这4种方

C语言:进程vs线程,如何选择?

进程vs线程,如何选择? 我们编写程序,到底是采用多线程还是多进程?这里是有区别的,采用不同的机制能够获得的效率也不一样.如何选择适合我们自己的程序的机制呢?下面是一些常见的选择的看法,不过也只是提供给大家参考参考,具体设计的时候还是要自己处理. ¨ 一个程序里面的所有的线程都在同一个运行空间中执行.而一个程序的子进程则是运行在另外的执行空间中的,这里是通过调用了exec函数来实现的. ¨ 同一个进程中的某个线程的故障可以影响其它的线程,因为所有的线程共享同一个虚拟内存空间以及其他资源.例如,某

Java中的进程与线程的实现

概述 进程与线程,本质意义上说, 是操作系统的调度单位,可以看成是一种操作系统 "资源" .Java 作为与 平台无关的编程语言,必然会对底层(操作系统)提供的功能进行进一步的封装,以平台无关的编程接口供程序员使用,进 程与线程作为操作系统核心概念的一部分无疑亦是如此.在 Java 语言中,对进程和线程的封装,分别提供了 Process 和 Thread 相关的一些类.本文首先简单的介绍如何使用这些类来创建进程和线程,然后着重介绍这些类是如何和操作系统本 地进程线程相对应的,给出了 J

进程与线程的关系和区别 CPU调度简介

进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位. 线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位. 进程和线程的关系: (1)一个线程只能属于一个进程,而一个进程可以有多个线程,但至少有一个线程. (2)资源分配给进程,同一进程的所有线程共享该进程的所有资源. (3)处理机分给线程,即真正在处理机上运行的是线程. (4)线程在执行过程中,需要协作同步.不同进程的线程间要利用消息通信的办法实现同步.

API之进程和线程函数

CancelWaitableTimer 这个函数用于取消一个可以等待下去的计时器操作 CallNamedPipe 这个函数由一个希望通过管道通信的一个客户进程调用 ConnectNamedPipe 指示一台服务器等待下去,直至客户机同一个命名管道连接 CreateEvent 创建一个事件对象 CreateMailslot 创建一个邮路.返回的句柄由邮路服务器使用(收件人) CreateMutex 创建一个互斥体(MUTEX) CreateNamedPipe 创建一个命名管道.返回的句柄由管道的服

MFC教程(9)-- MFC的进程和线程(1)

MFC定义了多种状态信息,这里要介绍的是模块状态.进程状态.线程状态.这些状态可以组合在一起,例如MFC句柄映射就是模块和线程局部有效的,属于模块-线程状态的一部分. 模块状态 这里模块的含义是:一个可执行的程序或者一个使用MFC DLL的DLL,比如一个OLE控件就是一个模块. 一个应用程序的每一个模块都有一个状态,模块状态包括这样一些信息:用来加载资源的 Windows实例句柄.指向当前CWinApp或者CWinThread对象的指针.OLE模块的引用计数.Windows对象与相应的MFC对

MFC教程(8)-- MFC的进程和线程

Win32的进程和线程概念 进程是一个可执行的程序,由私有虚拟地址空间.代码.数据和其他操作系统资源(如进程创建的文件.管道.同步对象等)组成.一个应用程序可以有一个或多个进程,一个进程可以有一个或多个线程,其中一个是主线程. 线程是操作系统分时调度分配CPU时间的基本实体.一个线程可以执行程序的任意部分的代码,即使这部分代码被另一个线程并发地执行:一个进程的所有线程共享它的虚拟地址空间.全局变量和操作系统资源. 之所以有线程这个概念,是因为以线程而不是进程为调度对象效率更高: 由于创建新进程必

Android中的进程和线程

进程和线程是现代网络操作系统的核心概念.Android作为一种优秀的.承袭Linux的移动操作系统,其进程和线程的概念是开发者和安全工作人员需要深入了解的问题.本文将详细介绍Android中的进程.线程以及相关的技术问题. 进程和线程的基本概念 当一个应用程序开始运行它的第一个组件时,Android会为它启动一个Linux进程,并在其中执行一个单一的线程.默认情况下,应用程序所有的组件均在这个进程的这个线程中运行.然而,你也可以安排组件在其他进程中运行,而且可以为任意进程衍生出其它线程. And

Linux多任务编程(一) 任务、进程、线程

Linux下多任务介绍 首先,先简单的介绍一下什么叫多任务系统?任务.进程.线程分别是什么? 它们之间的区别是什么?,从而可以宏观的了解一下这三者,然后再针对每一个仔细的讲解. 什么叫 多任务系统?多任务系统指可以同一时间内运行多个应用程序,每个应用程序被称作一个任务. 任务 定义:任务是一个逻辑概念,指由一个软件完成的任务,或者是一系列共同达到某一目的的操作. 进程 定义:进程是指一个具有独立功能的程序在某个数据集上的一次动态执行过程,它是系统进行资源分配和调度 的最小单元. 线程定义:线程是