Найти - Пользователи
Полная версия: Проблема: Вечное ожидание при работе с очередями Queue [РЕШЕНО]
Начало » Python для экспертов » Проблема: Вечное ожидание при работе с очередями Queue [РЕШЕНО]
1
JOHN_16
Приветствую всех. При знакомстве с модуем Queue возникла проблема, которая путем долгих обдумываний, чтения документации и распросов Гугла не разрешилась.

Суть кода: имеется класс Worker, метод run которого работает в цикле до тех по пока не получит исколючение о том что очередь пуста. В обычном случае все работает прекрасно, я же имитировал ситуацию что при выполнении задачи происходит разовая ошибка, в таком случае поток должен поместить невыполненную задачу в очередь, что бы какой либо поток в следующий раз ее выполнил.

Суть проблемы: имеются 100 задач и 20 потоков, после отрабатывания всех задач потоки завершают работу, остается 1 основной поток, но выполнение зависает на моменте self.queue.join().

Обходное решение я сделал,но тут вопрос именно в том что почему так происходит и как должно быть правильно.

Код ниже готов к выполнению, комментарии указаны, выводится отладочная информация которая отображает ход работы.
# -*- coding: utf-8 -*-
import Queue, thread, threading, time, random, datetime, sys

# переменная для имитации разовой ошибки
err=False
# счетчик возбуждения исключений Queue.Empty
count_of_empty=0

class Worker(threading.Thread):
"""
Класс потока который будет брать задачи из очереди и выполнять их до успешного
окончания
"""
def __init__(self, queue, output):
# Обязательно инициализируем супер класс (класс родитель)
super(Worker,self).__init__()
# Устанавливаем поток в роли демона, это необходимо что бы по окончании выполнения
# метода run() поток корректно завершил работу,а не остался висеть в ожидании
self.setDaemon(True)
# экземпляр класса содержит в себе очередь что бы при выполнении потока иметь к ней доступ
self.queue=queue
self.output=output

def run(self):
"""
Основной код выполнения потока должен находиться здесь
"""
while True:
try:
# переменная для иммитации единичной ошибки во время выполнения потока
global err
# фиксируем время начала работы потока
start=datetime.datetime.now().strftime('%H:%M:%S')
# запрашиваем из очереди объект
target=self.queue.get(block=False)
print '%s get target: %s'%(self.getName(), target)

# эмулируем однократно возникающую ошибку
if ((target==2) and (not err)):
err=True
raise Exception('test error')

# делаем видимость занятости потока
# путем усыпления его на случайную величину
sleep_time=random.randint(0,10)
time.sleep(sleep_time)
print '%s %s target: %s sleep %ss'%(start, self.getName(), target, sleep_time)
# сообщаем о том что задача для полученного объекта из очереди выполнена
self.output.put(target, block=False)
self.queue.task_done()
# После того как очередь опустеет будет сгенерировано исключение
except Queue.Empty:
global count_of_empty
count_of_empty=count_of_empty+1
sys.stderr.write('%s get Queue.EMPTY exception\r\nCount of empty = %s'%(self.getName(), count_of_empty))
# сообщаем что задание выполнено
#self.queue.task_done()
# выходим из бесконченого цикла
break
# если при выполнении потока будет сгенерировано исключение об ошибке,
# то оно будет обработано ниже
except Exception, e:
# выводим на экран имя потока и инфо об ошибке
sys.stderr.write('%s get %s exception\r\n'%(self.getName(), e))
# Предполагаем раз объект из очереди не был корреткно обработан,
# то добавляем его в очередь
self.queue.put(target, block=False)

class Test(object):
def __init__(self, data, number_threads):
# создаем экземпля класса очереди Queue
self.queue=Queue.Queue()
self.output=Queue.Queue()
# заполняем очередь
for item in data:
self.queue.put(item)
# определяем количество потоков которые будут обслуживать очередь
self.NUMBER_THREADS=number_threads
# список экземпляров класса потока, в последствии можно
# обратиться к нему что бы получать сведения о состоянии потоков
self.threads=[]

def execute(self):
# создаем экземпляра классов потоков и запускаем их
for i in xrange(self.NUMBER_THREADS):
self.threads.append(Worker(self.queue, self.output))
self.threads[-1].start()

# Блокируем выполнение кода до тех пор пока не будут выполнены все
# элементы очереди. Это означает что сколкьо раз были добавлены элементы
# очереди, то столько же раз должен быть вызван task_done().
self.queue.join()

t=datetime.datetime.now()
test=Test(range(100), 20)
test.execute()
print 'the end in %s'%(datetime.datetime.now()-t)
# вывод debug информации
print len(list(test.output.__dict__['queue']))
print sorted(list(test.output.__dict__['queue']))
Soteric
А что отображается в выводе?

Похоже проблема в том, что когда бросается исключение, то вы добавляете новую таску в очередь, но не делаете при этом task_done. В итоге получается, что счетчик заданий в очереди увеличился на единицу, а счетчик законченных остался на прежнем месте. И хотя очередь пуста, основной поток не может продолжить работу, посколько число вызовов task_done меньше вызовов put. task_done надо делать в любом случае.
JOHN_16
Soteric
Спасибо, именно так и было. Блин, и почему сам не додумался до этого вполне логичного решения…
Итого решение выглядит так:
# -*- coding: utf-8 -*-
import Queue, thread, threading, time, random, datetime, sys

# переменная для имитации разовой ошибки
err=False
# счетчик возбуждения исключений Queue.Empty
count_of_empty=0

class Worker(threading.Thread):
"""
Класс потока который будет брать задачи из очереди и выполнять их до успешного
окончания
"""
def __init__(self, queue, output):
# Обязательно инициализируем супер класс (класс родитель)
super(Worker,self).__init__()
# Устанавливаем поток в роли демона, это необходимо что бы по окончании выполнения
# метода run() поток корректно завершил работу,а не остался висеть в ожидании
self.setDaemon(True)
# экземпляр класса содержит в себе очередь что бы при выполнении потока иметь к ней доступ
self.queue=queue
self.output=output

def run(self):
"""
Основной код выполнения потока должен находиться здесь
"""
while True:
try:
# переменная для иммитации единичной ошибки во время выполнения потока
global err
# фиксируем время начала работы потока
start=datetime.datetime.now().strftime('%H:%M:%S')
# запрашиваем из очереди объект
target=self.queue.get(block=False)
print '%s get target: %s'%(self.getName(), target)

# эмулируем однократно возникающую ошибку
if ((target==2) and (not err)):
err=True
raise Exception('test error')

# делаем видимость занятости потока
# путем усыпления его на случайную величину
sleep_time=random.randint(0,10)
time.sleep(sleep_time)
print '%s %s target: %s sleep %ss'%(start, self.getName(), target, sleep_time)
# сообщаем о том что задача для полученного объекта из очереди выполнена
self.output.put(target, block=False)
self.queue.task_done()
# После того как очередь опустеет будет сгенерировано исключение
except Queue.Empty:
global count_of_empty
count_of_empty=count_of_empty+1
sys.stderr.write('%s get Queue.EMPTY exception\r\nCount of empty = %s'%(self.getName(), count_of_empty))
# выходим из бесконченого цикла
break
# если при выполнении потока будет сгенерировано исключение об ошибке,
# то оно будет обработано ниже
except Exception, e:
# сообщаем что задание выполнено, что бы потом вновь добавить его в очередь
self.queue.task_done()
# выводим на экран имя потока и инфо об ошибке
sys.stderr.write('%s get %s exception\r\n'%(self.getName(), e))
# Предполагаем раз объект из очереди не был корреткно обработан,
# то добавляем его в очередь
self.queue.put(target, block=False)

class Test(object):
def __init__(self, data, number_threads):
# создаем экземпля класса очереди Queue
self.queue=Queue.Queue()
self.output=Queue.Queue()
# заполняем очередь
for item in data:
self.queue.put(item)
# определяем количество потоков которые будут обслуживать очередь
self.NUMBER_THREADS=number_threads
# список экземпляров класса потока, в последствии можно
# обратиться к нему что бы получать сведения о состоянии потоков
self.threads=[]

def execute(self):
# создаем экземпляра классов потоков и запускаем их
for i in xrange(self.NUMBER_THREADS):
self.threads.append(Worker(self.queue, self.output))
self.threads[-1].start()

# Блокируем выполнение кода до тех пор пока не будут выполнены все
# элементы очереди. Это означает что сколкьо раз были добавлены элементы
# очереди, то столько же раз должен быть вызван task_done().
self.queue.join()

t=datetime.datetime.now()
test=Test(range(100), 20)
test.execute()
print 'the end in %s'%(datetime.datetime.now()-t)
# вывод debug информации
print len(list(test.output.__dict__['queue']))
print sorted(list(test.output.__dict__['queue']))
This is a "lo-fi" version of our main content. To view the full version with more information, formatting and images, please click here.
Powered by DjangoBB