Суть кода: имеется класс Worker, метод run которого работает в цикле до тех по пока не получит исколючение о том что очередь пуста. В обычном случае все работает прекрасно, я же имитировал ситуацию что при выполнении задачи происходит разовая ошибка, в таком случае поток должен поместить невыполненную задачу в очередь, что бы какой либо поток в следующий раз ее выполнил.
Суть проблемы: имеются 100 задач и 20 потоков, после отрабатывания всех задач потоки завершают работу, остается 1 основной поток, но выполнение зависает на моменте self.queue.join().
Обходное решение я сделал,но тут вопрос именно в том что почему так происходит и как должно быть правильно.
Код ниже готов к выполнению, комментарии указаны, выводится отладочная информация которая отображает ход работы.
# -*- coding: utf-8 -*-
import Queue, thread, threading, time, random, datetime, sys
# переменная для имитации разовой ошибки
err=False
# счетчик возбуждения исключений Queue.Empty
count_of_empty=0
class Worker(threading.Thread):
"""
Класс потока который будет брать задачи из очереди и выполнять их до успешного
окончания
"""
def __init__(self, queue, output):
# Обязательно инициализируем супер класс (класс родитель)
super(Worker,self).__init__()
# Устанавливаем поток в роли демона, это необходимо что бы по окончании выполнения
# метода run() поток корректно завершил работу,а не остался висеть в ожидании
self.setDaemon(True)
# экземпляр класса содержит в себе очередь что бы при выполнении потока иметь к ней доступ
self.queue=queue
self.output=output
def run(self):
"""
Основной код выполнения потока должен находиться здесь
"""
while True:
try:
# переменная для иммитации единичной ошибки во время выполнения потока
global err
# фиксируем время начала работы потока
start=datetime.datetime.now().strftime('%H:%M:%S')
# запрашиваем из очереди объект
target=self.queue.get(block=False)
print '%s get target: %s'%(self.getName(), target)
# эмулируем однократно возникающую ошибку
if ((target==2) and (not err)):
err=True
raise Exception('test error')
# делаем видимость занятости потока
# путем усыпления его на случайную величину
sleep_time=random.randint(0,10)
time.sleep(sleep_time)
print '%s %s target: %s sleep %ss'%(start, self.getName(), target, sleep_time)
# сообщаем о том что задача для полученного объекта из очереди выполнена
self.output.put(target, block=False)
self.queue.task_done()
# После того как очередь опустеет будет сгенерировано исключение
except Queue.Empty:
global count_of_empty
count_of_empty=count_of_empty+1
sys.stderr.write('%s get Queue.EMPTY exception\r\nCount of empty = %s'%(self.getName(), count_of_empty))
# сообщаем что задание выполнено
#self.queue.task_done()
# выходим из бесконченого цикла
break
# если при выполнении потока будет сгенерировано исключение об ошибке,
# то оно будет обработано ниже
except Exception, e:
# выводим на экран имя потока и инфо об ошибке
sys.stderr.write('%s get %s exception\r\n'%(self.getName(), e))
# Предполагаем раз объект из очереди не был корреткно обработан,
# то добавляем его в очередь
self.queue.put(target, block=False)
class Test(object):
def __init__(self, data, number_threads):
# создаем экземпля класса очереди Queue
self.queue=Queue.Queue()
self.output=Queue.Queue()
# заполняем очередь
for item in data:
self.queue.put(item)
# определяем количество потоков которые будут обслуживать очередь
self.NUMBER_THREADS=number_threads
# список экземпляров класса потока, в последствии можно
# обратиться к нему что бы получать сведения о состоянии потоков
self.threads=[]
def execute(self):
# создаем экземпляра классов потоков и запускаем их
for i in xrange(self.NUMBER_THREADS):
self.threads.append(Worker(self.queue, self.output))
self.threads[-1].start()
# Блокируем выполнение кода до тех пор пока не будут выполнены все
# элементы очереди. Это означает что сколкьо раз были добавлены элементы
# очереди, то столько же раз должен быть вызван task_done().
self.queue.join()
t=datetime.datetime.now()
test=Test(range(100), 20)
test.execute()
print 'the end in %s'%(datetime.datetime.now()-t)
# вывод debug информации
print len(list(test.output.__dict__['queue']))
print sorted(list(test.output.__dict__['queue']))