Python多线程之生产者消费者
1、写在前面
经典的生产者和消费者模型中,\(N\)个线程根据要求生产出第一步处理结果,\(M\)个消费者线程对上述结果进行第二步处理。生产者与消费者之间有明确的前后顺序关系。
关于生产者消费者问题,wikipedia是这么描述的:
生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
解决方案,wikipedia也给出了一个解释:
要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题,常用的方法有信号灯法等。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。
2、Python中的生产者消费者模型:
利用Python中Queue
对象,可以很方便的解决生产者消费者问题。例如在程序中,创建两个Queue
对象,前者是输入任务存放对象(task_queue
),后者是该任务完成后的结果存放对象(out_queue
)。生产者首先从task_queue
中取出任务(Queue.get()
),进行处理后,将处理结果存入out_queue
,然后调用Queue.task_done()
函数,表明处理完成。关于Queue.task_done()
函数。官方文档给出说明如下:
Queue.task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises a ValueError if called more times than there were items placed in the queue.
翻译如下:
Queue.task_done()
表明之前队列中的任务已经完成。由Queue的消费线程使用。 使用
get()
函数获取任务后,当调用task_done()
函数时,表示当前所处理的任务已经完成。如果使用
join()
阻塞当前执行,那么当所有的任务处理完成后,阻塞会解除。(意味着对每个放进(put()
)队列的项目处理后,会接收到1个task_done()
通知).调用次数大于Queue中放入项目数量时,会发出
ValueError
异常
简要理解:
- 每个
put()
进去的项目,使用get()
获取,处理完成后,调用task_done()
发送完成通知。 - 如果
put()
时,缓冲的Queue
队列已满,则该线程阻塞,等待缓冲Queue
有空间,缓冲队列有空间之后,put()
立即执行。 - 如果使用
join()
阻塞了线程,那么所有任务完成后,接收到1个task_done()
通知,阻塞解除,线程继续执行。 - 每个
get()
的项目对应一个task_done()
,调用task_done()
的次数大于get()
时,会发出ValueError
异常。
最后需要说明一点的是,Queue
对象的两个成员函数,put()
、get()
,在目标队列已满(put()
)或为空时(get()
)均会阻塞线程,一旦条件满足能够放入或取出数据时,线程解除阻塞,继续向下执行。
实际使用时,可以采用两种方案:
- 一个对象一个线程方式,每个对象管理自己的线程;
- 一个对象管理多个线程的方式,
上代码:
1 | # 一个对象一个线程方式 |
例程2: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94# 一个对象多个线程方式
from threading import Thread
from queue import Queue
import time
import logging
import random
logging.basicConfig(level=logging.DEBUG,format='(%(threadName)-9s) %(message)s',)
def fib(n):
if n<=2:
return 1
return fib(n-1)+fib(n-2)
class Producer:
def __init__(self, task_queue, out_queue, threadCount=3):
self.__task_queue = task_queue
self.__out_queue = out_queue
self.__thrdCount = threadCount
self.__threads = []
for i in range(threadCount):
p = Thread(name="生产者{:02d}".format(i+1),target=self.thread_worker)
p.setDaemon(True)
self.__threads.append(p)
p.start()
def thread_worker(self):
while True:
logging.info('P: 任务队列大小 {},缓冲队列大小 {}'.format(self.__task_queue.qsize(),self.__out_queue.qsize()))
#if not self.__out_queue.full():
item = self.__task_queue.get()
result = self.job(item)
logging.info("P: 值= {} , 结果= {} ".format(item,result))
self.__out_queue.put(result)
logging.info("P: -------------压入缓冲队列 {}--------------".format(item))
self.__task_queue.task_done()
#else:
# logging.info('P: ---缓冲队列已满,等待中zzzZZZ...')
time.sleep(random.randint(1,5)/40)
def job(self,data):
logging.debug("P开始处理数据(in job): {}".format(data))
#time.sleep(random.randint(1,5)/40)
d = data*3
logging.debug("P数据处理完成(in job):{}*3={} ".format(data,d))
return d
class Consumer:
def __init__(self, task_queue, threadCount=4):
self.__task_queue= task_queue
self.__thrdCount = threadCount
self.__threads = []
for i in range(threadCount):
c = Thread(name="消费者{:02d}".format(i+1),target=self.thread_worker)
c.setDaemon(True)
self.__threads.append(c)
c.start()
def thread_worker(self):
while True:
logging.info('C: 任务队列大小 {} '.format(self.__task_queue.qsize()))
item = self.__task_queue.get()
logging.info('C:-------------取出任务数据 {}({})--------------'.format(item,item//3))
result = self.job(item)
logging.info("C: 值= {} , 结果= {} ".format(item,result))
self.__task_queue.task_done()
def job(self,data):
logging.debug("C开始处理数据(in job): {}".format(data))
time.sleep(random.randint(1,5)/40)
d = fib(data)
logging.debug("C数据处理完成(in job): --------------({})fib({})={}----------".format(data//3,data,d))
return d
if __name__ == '__main__':
logging.info("任务开始.........")
BUFFER_SIZE=2
tasks_queue = Queue()
ret_queue = Queue(maxsize=BUFFER_SIZE)
for i in range(1,5):
tasks_queue.put(i)
p = Producer(task_queue=tasks_queue,out_queue=ret_queue,threadCount=3)
c = Consumer(task_queue=ret_queue,threadCount=1)
tasks_queue.join()
ret_queue.join()
logging.info("任务完成!")