Python并发
Python语言虽然支持多线程编程,但还是需要取决于具体会用的操作系统。当然,现代的操作系统基本上都支持多线程。例如,Windows、Mac OS X、Linux、Solaris、FreeBSD等。 Python多线程在底层使用了兼容POSIX的线程,也就是众所周知的pthread。
Python中的多线程
在Python生态中,当我们说到多线程编程时,很多工程师就会提出Python的多线程编程是鸡肋的观点。他们的理由是,Python由于GIL锁的原因,并没有真正并发,因此,Python的多线程并不能真的提高程序的运行效率。那么,事实到底是怎么样的呢?
Python默认的解释器,由于全局解释器锁的存在,确实在任意时刻都只有一个线程在执行Python代码,致使多线程不能充分利用机器多核的特性。但是,我们的程序也不是无时无刻不在计算的,我们的程序需要等待用户输入、等待文件读写以及网络收发数据,这些都是比较费时的操作。使用Python多线程,计算机会将这些等待操作放到后台去处理,从而释放出宝贵的计算资源,继续进行计算。也就是说,如果读者的程序是CPU密集型的,使用Python的多线程确实无法提升程序的效率,如果读者的程序是IO密集型的,则可以使用Python的多线程提高程序的整体效率。
GIL
其他语言,CPU是多核时是支持多个线程同时执行。但在Python中,无论是单核还是多核,同时只能由一个线程在执行。其根源是GIL的存在。GIL的全称是Global Interpreter Lock(全局解释器锁),来源是Python设计之初的考虑,为了数据安全所做的决定。某个线程想要执行,必须先拿到GIL,我们可以把GIL看作是“通行证”,并且在一个Python进程中,GIL只有一个。拿不到通行证的线程,就不允许进入CPU执行。 而目前Python的解释器有多种,例如:
- CPython:CPython是用C语言实现的Python解释器。 作为官方实现,它是最广泛使用的Python解释器。
- PyPy:PyPy是用RPython实现的解释器。RPython是Python的子集, 具有静态类型。这个解释器的特点是即时编译,支持多重后端(C, CLI, JVM)。PyPy旨在提高性能,同时保持最大兼容性(参考CPython的实现)。
- Jython:Jython是一个将Python代码编译成Java字节码的实现,运行在JVM (Java Virtual Machine) 上。另外,它可以像是用Python模块一样,导入并使用任何Java类。
- IronPython:IronPython是一个针对 .NET 框架的Python实现。它可以用Python和 .NET framework的库,也能将Python代码暴露给 .NET框架中的其他语言。 GIL只在CPython中才有,而在PyPy和Jython中是没有GIL的。 每次释放GIL锁,线程进行锁竞争、切换线程,会消耗资源。这就导致打印线程执行时长,会发现耗时更长的原因。 并且由于GIL锁存在,Python里一个进程永远只能同时执行一个线程(拿到GIL的线程才能执行),这就是为什么在多核CPU上,Python 的多线程效率并不高的根本原因。
创建线程
在Python中创建线程非常简单,因为标准库自带了多线程相关的模块。Python的标准库提供了两个与线程相关的模块,分别是thread和threading。其中,thread是低级模块,threading是高级模块,threading模块对thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块即可。 下面是一个简单使用多线程的例子:
import threading
def say_hi(count, name):
while count > 0:
print("hello", name)
count -= 1
def main():
usernames = ['Bob', 'Jack', 'Pony', 'Jone', 'Mike']
for i in range(5):
thread = threading.Thread(target=say_hi, args=(50, usernames[i]))
thread.start()
if __name__ == '__main__':
main()
在这段程序中,我们的say_hi函数接受两个参数,分别是打印的次数和名字。在main函数中,我们首先定义了具有5个元素的列表。随后,在创建线程时将各个元素传递给不同的线程。可以看到,在Python中给线程传递参数非常简单,即通过Threading.Thread类调用的args参数传递。无论需要传递多少个参数,都是通过元组的形式进行传递。
线程的常用方法
我们已经使用了threading.Thread类的start方法来启动线程。threading.Thread类还有其他的一些方法,其中,比较常用的有:
- isAlive:检查线程是否在运行中;
- getName:获取线程的名称;
- setName:设置线程的名称;
- join:该方法会阻塞调用,直到线程中止;
- setDaemon:设置线程为守护线程;
- isDaemon:判断线程是否守护线程。
通过继承创建线程
除了直接构造threading.Thread对象以外,在Python中,我们还可以通过继承threading.Thread的方式编写多线程程序。通过继承threading.Thread类进行多线程编程,只需要在子类中实现run方法,并在run方法中实现该线程的业务逻辑即可。
import threading
class MyThread(threading.Thread):
def __init__(self, count, name):
super(MyThread, self).__init__()
self.count = count
self.name = name
def run(self):
while self.count > 0:
print("hello", self.name)
self.count -= 1
def main():
usernames = ['Bob', 'Jack', 'Pony', 'Jone', 'Mike']
for i in range(5):
thread = MyThread(50, username[i])
thread.start()
if __name__ == '__main__':
main()
线程同步与互斥锁
线程之所以比进程轻量,是因为多个线程之间是共享内存的。多个线程之间可以平等访问内存中的数据。因为多个线程可以共享数据,所以,为了保证数据的正确性需要对多个线程进行同步。
虽然Python中有全局解释器锁的存在,但是,在编写自己的程序时,依然要设法防止多个线程争用同一份数据。如果多个线程之间存在资源共享,在不加锁的前提下允许多个线程修改同一个对象,那么,程序的数据结构可能会遭到破坏。 在Python标准库的threading模块中有一个名为Lock的工厂函数,它会返回一个thread.LockType对象。该对象的acquire方法用来获取锁,release方法用来释放锁。对于那些每次只允许一个线程操作的数据,可以将其操作放到acquire和release方法之间。如下所示:
lock = threading.Lock()
lock.acquire()
# do something
lock.release()
为了保证无论在什么情况下,退出当前代码块时都能正确地释放锁,一般会将加锁和释放锁的操作放在try/finally语句之中。如下所示:
try:
lock.acquire()
# do something
finally:
lock.release()
当然,在Python语言中,我们还有更加简单的写法,即使用上下文管理器。如下所示:
with lock:
# do something
下面来看一个使用互斥锁的例子。在这个例子中,我们定义了一个全局变量,然后在main函数中创建了10个线程。每个线程要做的事情都是一样的,即将全局变量num累加100000次。由于num是一个全局变量,并且各个线程都需要更新这个变量,因此,存在数据争用的问题。为了解决这个问题,我们使用了threading.Lock来保护这个全局变量。所有要修改num这个全局变量的线程,在修改之前都需要加锁。在incre函数中,我们通过with语句来加锁。因此,下面的程序能够正确的计算出num的值。
import threading
lock = threading.Lock()
num = 0
def incre(count):
global num
while count > 0:
with lock:
num += 1
count -= 1
def main():
threads = []
for i in range(10):
thread = threading.Thread(target=incre, args=(100000,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
print("expected value is", 10 * 100000, "real value is", num)
if __name__ == '__main__':
main()
如果我们将incre函数中的加锁操作去掉,变成下面这个样子。那么,几乎不可能得到正确的结果。读者可以在自己的计算机上测试一下效果,不但每次都无法获取到正确的结果,而且,每一次计算的结果都不相同。
def incre(count):
global num
while count > 0:
num += 1
count -= 1
互斥锁
线程之间数据共享的。当多个线程对某一个共享数据进行操作时,就需要考虑到线程安全问题。threading模块中定义了Lock 类,提供了互斥锁的功能来保证多线程情况下数据的正确性。
用法的基本步骤:
#创建锁
mutex = threading.Lock()
#锁定
mutex.acquire([timeout])
#释放
mutex.release()
其中,锁定方法acquire可以有一个超时时间的可选参数timeout。如果设定了timeout,则在超时后通过返回值可以判断是否得到了锁,从而可以进行一些其他的处理。具体用法见示例代码:
import threading
import time
num = 0
mutex = threading.Lock()
class MyThread(threading.Thread):
def run(self):
global num
time.sleep(1)
if mutex.acquire(1):
num = num + 1
msg = self.name + ': num value is ' + str(num)
print(msg)
mutex.release()
if __name__ == '__main__':
for i in range(5):
t = MyThread()
t.start()
可重入锁(递归锁)
为了满足在同一线程中多次请求同一资源的需求,Python提供了可重入锁(RLock)。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire 的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。 具体用法如下:
#创建 RLock
mutex = threading.RLock()
class MyThread(threading.Thread):
def run(self):
if mutex.acquire(1):
print("thread " + self.name + " get mutex")
time.sleep(1)
mutex.acquire()
mutex.release()
mutex.release()
线程安全队列Queue
队列是线程间最常用的交换数据的形式,Queue模块实现了线程安全的队列,尤其适合多线程编程。Queue模块实现了三种类型队列:
- Queue Queue:一个先进先出(FIFO)的队列,最先加入队列的元素最先取出;
- LifoQueue LifoQueue:一个后进先出(LIFO)的队列,最后加入队列的元素最先取出;
- PriorityQueue PriorityQueue:优先级队列,队列中的元素根据优先级排序。
在这三种不同的队列中,Queue是最简单也是最常用的队列。如下所示:
import Queue
q = Queue.Queue()
for i in range(3):
q.put(i)
while not q.empty():
print(q.get())
Queue本身是一个先进先出的队列,我们使用put方法向队列中添加元素时,会将元素添加到队列的尾部。使用get方法从Queue中获取元素时,会从队列的头部取出元素。这里的例子仅仅演示了单线程的情况下,Queue最大的优势在于它是线程安全的,我们完全可以将Queue用于多线程环境中,而不用处理并发访问的情况。
在使用Queue进行多线程编程前,我们先看一下它的常用方法:
- empty:判断队列是否为空;
- full:判断队列是否已满;
- put:向队列中添加元素,可以通过可选的block参数和timeout参数控制put是否为阻塞等待。如果是阻塞等待,并且timeout是一个正数,那么,put方法将会在超时以后引发Queue.Full异常;
- put_nowait:非阻塞地向队列添加元素;
- get:从队列中取出元素,可以通过block参数控制是否阻塞等待,通过timeout控制阻塞等待的时间。如果超时也没有得到元素,抛出Queue.Empty异常;
- get_nowait:非阻塞地从队列中取出元素;
- task_done:与join一起工作,指示先前取出的元素已经完成处理;
- join:阻塞等待,直到所有消费者对每一个元素都调用了task_done。
下面是Python官方给出了多线程模型:
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = Queue()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are done
在这段官方给出的例子中,首先创建了一个Queue对象,随后,创建了num_worker_threads个线程。每个线程都是一个无限循环,在循环中,从队列获取元素然后进行处理,处理完成以后调用task_done指示当前元素处理完毕。这段程序的第二个for循环就相当于生产者和消费者模型中的生产者,它从数据源中读取数据,然后放入到队列中等待消费者处理。
线程合并
join函数执行顺序是逐个执行每个线程,执行完毕后继续往下执行。主线程结束后,子线程还在运行,join函数使得主线程等到子线程结束时才退出。
import threading
def count(n):
while n > 0:
n -= 1
if __name__ == "__main__":
t1 = threading.Thread(target=count, args=("100000",))
t2 = threading.Thread(target=count, args=("100000",))
t1.start()
t2.start()
# 将 t1 和 t2 加入到主线程中
t1.join()
t2.join()
Python 多进程
Python要进行多进程操作,需要用到muiltprocessing库,其中的Process类跟threading模块的Thread类很相似。所以直接看代码熟悉多进程。
方法1:直接使用Process, 代码如下:
from multiprocessing import Process
def show(name):
print("Process name is " + name)
if __name__ == "__main__":
proc = Process(target=show, args=('subprocess',))
proc.start()
proc.join()
方法2:继承Process来自定义进程类,重写run方法, 代码如下:
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, name):
super(MyProcess, self).__init__()
self.name = name
def run(self):
print('process name :' + str(self.name))
time.sleep(1)
if __name__ == '__main__':
for i in range(3):
p = MyProcess(i)
p.start()
for i in range(3):
p.join()
多进程通信
进程之间不共享数据的。如果进程之间需要进行通信,则要用到Queue模块或者Pipe模块来实现。
Queue
Queue是多进程安全的队列,可以实现多进程之间的数据传递。它主要有两个函数put和get。
- put() 用以插入数据到队列中,put还有两个可选参数:blocked 和timeout。如果blocked为 True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出 Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
- get()可以从队列读取并且删除一个元素。同样get有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且 timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。 ```shell from multiprocessing import Process, Queue
def put(queue): queue.put(‘Queue 用法’)
if name == ‘main’:
queue = Queue()
pro = Process(target=put, args=(queue,))
pro.start()
print(queue.get())
pro.join()
### Pipe
Pipe的本质是进程之间的用管道数据传递,而不是数据共享,这和socket有点像。pipe() 返回两个连接对象分别表示管道的两端,每端都有send()和recv()函数。如果两个进程试图在同一时间的同一端进行读取和写入那么,这可能会损坏管道中的数据,具体用法如下:
```shell
from multiprocessing import Process, Pipe
def show(conn):
conn.send('Pipe 用法')
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
pro = Process(target=show, args=(child_conn,))
pro.start()
print(parent_conn.recv())
pro.join()
进程池
创建多个进程,我们不用傻傻地一个个去创建。我们可以使用Pool模块来搞定。Pool 常用的方法如下:
- apply() 同步执行(串行)
- apply_async() 异步执行(并行)
- terminate() 立刻关闭进程池
- join() 主进程等待所有子进程执行完毕。必须在close或terminate()之后使用
- close() 等待所有进程结束后,才关闭进程池 ```shell #coding: utf-8 import multiprocessing import time
def func(msg): print(“msg:”, msg) time.sleep(3) print(“end”)
if name == “main”: # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 pool = multiprocessing.Pool(processes = 3) for i in range(5): msg = “hello %d” %(i) # 非阻塞式,子进程不影响主进程的执行,会直接运行到 pool.join() pool.apply_async(func, (msg, ))
# 阻塞式,先执行完子进程,再执行主进程
# pool.apply(func, (msg, ))
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
# 调用join之前,先调用close函数,否则会出错。
pool.close()
# 执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
pool.join()
print("Sub-process(es) done.") ``` - 进程池Pool被创建出来后,即使实际需要创建的进程数远远大于进程池的最大上限,p.apply_async(test)代码依旧会不停的执行,并不会停下等待;相当于向进程池提交了10个请求,会被放到一个队列中; - 当执行完p1 = Pool(5)这条代码后,5条进程已经被创建出来了,只是还没有为他们各自分配任务,也就是说,无论有多少任务,实际的进程数只有5条,计算机每次最多5条进程并行。 - 当Pool中有进程任务执行完毕后,这条进程资源会被释放,pool会按先进先出的原则取出一个新的请求给空闲的进程继续执行; - 当Pool所有的进程任务完成后,会产生5个僵尸进程,如果主线程不结束,系统不会自动回收资源,需要调用join函数去回收。 - join函数是主进程等待子进程结束回收系统资源的,如果没有join,主程序退出后不管子进程有没有结束都会被强制杀死; - 创建Pool池时,如果不指定进程最大数量,默认创建的进程数为系统的内核数量.