Python多线程之生产者消费者

1、写在前面

经典的生产者和消费者模型中,\(N\)个线程根据要求生产出第一步处理结果,\(M\)个消费者线程对上述结果进行第二步处理。生产者与消费者之间有明确的前后顺序关系。

关于生产者消费者问题,wikipedia是这么描述的:

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

解决方案,wikipedia也给出了一个解释:

要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题,常用的方法有信号灯法等。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。

2、Python中的生产者消费者模型:

利用Python中Queue对象,可以很方便的解决生产者消费者问题。例如在程序中,创建两个Queue对象,前者是输入任务存放对象(task_queue),后者是该任务完成后的结果存放对象(out_queue)。生产者首先从task_queue中取出任务(Queue.get()),进行处理后,将处理结果存入out_queue,然后调用Queue.task_done()函数,表明处理完成。关于Queue.task_done()函数。官方文档给出说明如下:

Queue.task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

翻译如下:

Queue.task_done()

表明之前队列中的任务已经完成。由Queue的消费线程使用。 使用get()函数获取任务后,当调用task_done()函数时,表示当前所处理的任务已经完成。

如果使用join()阻塞当前执行,那么当所有的任务处理完成后,阻塞会解除。(意味着对每个放进(put())队列的项目处理后,会接收到1个task_done()通知).

调用次数大于Queue中放入项目数量时,会发出ValueError异常

简要理解:

  1. 每个put()进去的项目,使用get()获取,处理完成后,调用task_done()发送完成通知。
  2. 如果put()时,缓冲的Queue队列已满,则该线程阻塞,等待缓冲Queue有空间,缓冲队列有空间之后,put()立即执行。
  3. 如果使用join()阻塞了线程,那么所有任务完成后,接收到1个task_done()通知,阻塞解除,线程继续执行。
  4. 每个get()的项目对应一个task_done(),调用task_done()的次数大于get()时,会发出ValueError异常。

最后需要说明一点的是,Queue对象的两个成员函数,put()get(),在目标队列已满(put())或为空时(get())均会阻塞线程,一旦条件满足能够放入或取出数据时,线程解除阻塞,继续向下执行。

实际使用时,可以采用两种方案:

  1. 一个对象一个线程方式,每个对象管理自己的线程;
  2. 一个对象管理多个线程的方式,

上代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# 一个对象一个线程方式
from queue import Queue
from threading import Thread
import logging
import time
import random

logging.basicConfig(level=logging.DEBUG,format='[%(threadName)-9s] %(message)s')

def fib(n):
if n<=2:
return 1
return fib(n-1)+fib(n-2)

class Producer(Thread):

def __init__(self,task_queue, out_queue,name=None):
super().__init__()
if name is not None:
self.name = name
self.__task_q = task_queue
self.__rets_q = out_queue
# 去掉下面依据注释,可以发现,该函数为主线程执行
# logging.debug("(Producer.__init__)任务大小:{}".format(task_queue.qsize()))

def run(self):
while True:
logging.info('P: 任务队列大小 {},缓冲队列大小 {}'.format(self.__task_q.qsize(),self.__rets_q.qsize()))
item = self.__task_q.get()
result = self.job(item)
logging.info("P: 值= {} , 结果= {} ".format(item,result))
self.__rets_q.put(result)
logging.info("P: -------------压入缓冲队列 {}--------------".format(item))
self.__task_q.task_done()
time.sleep(random.randint(1,5)/40)

def job(self,data):
logging.debug("P开始处理数据(in job): {}".format(data))
#time.sleep(random.randint(1,5)/40)
d = data*3
logging.debug("P数据处理完成(in job):{}*3={} ".format(data,d))
return d


class Consumer(Thread):
def __init__(self,task_queue,name=None):
super().__init__()
if name is not None:
self.name = name
self.__tasks_q = task_queue

def run(self):
while True:
logging.info('C: 任务队列大小 {} '.format(self.__tasks_q.qsize()))
item = self.__tasks_q.get()
logging.info('C:-------------取出任务数据 {}({})--------------'.format(item,item//3))
result = self.job(item)
logging.info("C: 值= {} , 结果= {} ".format(item,result))
self.__tasks_q.task_done()

def job(self,data):
logging.debug("C开始处理数据(in job): {}".format(data))
time.sleep(random.randint(1,5)/40)
d = fib(data)
logging.debug("C数据处理完成(in job): --------------({})fib({})={}----------".format(data//3,data,d))
return d

def main():
# N 生产者数量
# M 消费者数量
N = 3
M = 1

# 任务队列
task_queue = Queue()
# 处理后任务队列
ret_queue = Queue()

# 初始化任务队列
for i in range(1,5):
task_queue.put(i)

# 创建N个生产者(每个生产者1个线程)
producers = []
for i in range(N):
p = Producer(task_queue=task_queue, out_queue=ret_queue, name=' 生产者{:02d}'.format(i+1))
producers.append(p)
p.start()

# 创建M个消费者(每个消费者1个线程)
consumers = []
for i in range(M):
c = Consumer(task_queue=ret_queue, name=' 消费者{:02d}'.format(i+1))
consumers.append(c)
c.start()

# 阻塞线程,等待任务完成
task_queue.join()
ret_queue.join()

# 当任务完成后,主线程退出,子线程也随之退出


if __name__ == "__main__":
logging.debug("开始运行.......")
main()
logging.debug('运行完成.....')

例程2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# 一个对象多个线程方式
from threading import Thread
from queue import Queue
import time
import logging
import random

logging.basicConfig(level=logging.DEBUG,format='(%(threadName)-9s) %(message)s',)

def fib(n):
if n<=2:
return 1
return fib(n-1)+fib(n-2)

class Producer:
def __init__(self, task_queue, out_queue, threadCount=3):
self.__task_queue = task_queue
self.__out_queue = out_queue
self.__thrdCount = threadCount
self.__threads = []
for i in range(threadCount):
p = Thread(name="生产者{:02d}".format(i+1),target=self.thread_worker)
p.setDaemon(True)
self.__threads.append(p)
p.start()

def thread_worker(self):
while True:
logging.info('P: 任务队列大小 {},缓冲队列大小 {}'.format(self.__task_queue.qsize(),self.__out_queue.qsize()))
#if not self.__out_queue.full():
item = self.__task_queue.get()
result = self.job(item)
logging.info("P: 值= {} , 结果= {} ".format(item,result))
self.__out_queue.put(result)
logging.info("P: -------------压入缓冲队列 {}--------------".format(item))
self.__task_queue.task_done()
#else:
# logging.info('P: ---缓冲队列已满,等待中zzzZZZ...')
time.sleep(random.randint(1,5)/40)

def job(self,data):
logging.debug("P开始处理数据(in job): {}".format(data))
#time.sleep(random.randint(1,5)/40)
d = data*3
logging.debug("P数据处理完成(in job):{}*3={} ".format(data,d))
return d

class Consumer:
def __init__(self, task_queue, threadCount=4):
self.__task_queue= task_queue
self.__thrdCount = threadCount
self.__threads = []
for i in range(threadCount):
c = Thread(name="消费者{:02d}".format(i+1),target=self.thread_worker)
c.setDaemon(True)
self.__threads.append(c)
c.start()

def thread_worker(self):
while True:
logging.info('C: 任务队列大小 {} '.format(self.__task_queue.qsize()))
item = self.__task_queue.get()
logging.info('C:-------------取出任务数据 {}({})--------------'.format(item,item//3))
result = self.job(item)
logging.info("C: 值= {} , 结果= {} ".format(item,result))
self.__task_queue.task_done()

def job(self,data):
logging.debug("C开始处理数据(in job): {}".format(data))
time.sleep(random.randint(1,5)/40)
d = fib(data)
logging.debug("C数据处理完成(in job): --------------({})fib({})={}----------".format(data//3,data,d))
return d

if __name__ == '__main__':
logging.info("任务开始.........")

BUFFER_SIZE=2
tasks_queue = Queue()
ret_queue = Queue(maxsize=BUFFER_SIZE)


for i in range(1,5):
tasks_queue.put(i)


p = Producer(task_queue=tasks_queue,out_queue=ret_queue,threadCount=3)
c = Consumer(task_queue=ret_queue,threadCount=1)


tasks_queue.join()
ret_queue.join()

logging.info("任务完成!")