Найти - Пользователи
Полная версия: Создание очереди (queue) более быстрым способом - Как?
Начало » Python для экспертов » Создание очереди (queue) более быстрым способом - Как?
1 2
Spectral
Всем добрый вечер!
У меня возник вот такой вопрос: занимаюсь написанием программы, которая делает некоторые файловые операции на компьютере с помощью
тредов.
Обобщённый код описанной ситуации:
...
q = Queue() # создание очереди как объекта, к которому можно обращаться в Питоне
'''
здесь некоторый код, который строит очередь, например,
мы распарсили файловые имена папок с помощью re и os в список list,
а дальше queue строится добавлением элементов списка через цикл
'''
for item in list:
q.put(item)

'''
создание очереди закончено, хотим теперь в несколько потоков (например, 10) обработать очередь c помощью "Работников"
'''
def worker(q):
while True:
a = q.get()
print a
q.task_done()

for i in xrange(10):
t = Thread(target=worker, args=(q,))
t.daemon = True
t.start()
'''
ждем, пока очередь не опустеет
'''
q.join()
Собственно, суть вопроса -
получается так, что queue строится последовательно через цикл, через q.put(item)
если item'ов больше пяти тысяч, то это уже будет “бутылочным горлышком” в программе - очередь будет создаваться долго по времени.
В связи с этим, вопрос к экспертам - как можно построение queue сделать более быстро?

Спасибо за внимание заранее.
Александр Кошелев
Заполняйте очередь асинхронно, т.е. тоже в отдельном треде параллельно с обработкой элементов.
Spectral
Заполняйте очередь асинхронно, т.е. тоже в отдельном треде параллельно с обработкой элементов.
Правильно ли я понимаю, что это отдалённо напоминает подход producer / consumer?
Александр Кошелев
Spectral
Правильно ли я понимаю, что это отдалённо напоминает подход producer / consumer?
Можно сказать что это он. По сути он уже у вас реализован, только в синхронном виде.
Spectral
Ок. Вот до чего я додумался, благодаря Вашей подсказке. Именно это вы имели в виду?

'''
простой пример для понимания - есть список на 1000 чисел,
пусть печать каждого числа и будет задачей для потока
'''
list = range(0,1000)
#print list

from Queue import Queue
from threading import Thread
q = Queue()


def worker_consume(q):
'''
'''
while True:
print q.get()
q.task_done()

def worker_produce(q,list_of_items):
while True:
if len(list_of_items)>=2:
q.put(list_of_items.pop())
pass

for i in xrange(1,4):
if i % 2 ==0:
t = Thread(target=worker_produce, args=(q, list))
t.setDaemon(True)
t.start()
else:
t = Thread(target=worker_consume, args=(q,))
t.setDaemon(True)
t.start()
q.join()
В описанном примере рождаются два производителя и два потребителя.
Два процесса накачивают очередь, остальные два - делают “задачу”.
Александр Кошелев
Spectral
Именно это вы имели в виду?
Похоже
Spectral
Поправил ошибки из предыдущего кода, сделал, как у меня реализовано в программе:

'''
простой пример для понимания - есть список на 1000 чисел,
пусть печать каждого числа и будет задачей для потока
'''
list = range(0,1000)
#print list

from Queue import Queue
from threading import Thread
q = Queue()


def worker_consume(q):
'''
'''
while True:
'''
обращение get() к q должен быть однократным
поэтому если в качестве q загружается список или тапл, необходимо делать локальную копию объекта
и обращаться по индексам к ней.
в противном случае поток будет выхватывать по несколько заданий из очереди и , как показала практика, программа работает непредсказуемо
'''

q.get()
q.task_done()

def worker_produce(q,list_of_items):
'''
предыдущий вариант этой функции вёл к deadlock (может, и не дедлок, но программа просто переставала работать через некоторое время)
'''
while len(list_of_items)>0:
q.put(list_of_items.pop())
'''
действительно, поток формирования очереди "умирает", как только очередь закончена, иначе - висит
'''

for i in xrange(1,4):
if i % 2 ==0:
t = Thread(target=worker_produce, args=(q, list))
t.setDaemon(True)
t.start()
else:
t = Thread(target=worker_consume, args=(q,))
t.setDaemon(True)
t.start()
q.join()
Последнее, от чего хотелось бы предостеречь - может случиться так, что задания будут выполняться быстрее, через очередь будет загружаться.
Можно воткнуть time.sleep(0.5) в worker_consumer, как мне думается.

Спасибо Александру Кошелеву за конструктивный комментарий.

upd.: Извините :)
Александр Кошелев
Spectral
Спасибо Андрею Кошелеву за конструктивный комментарий.
Я всё-таки Саша.
Soteric
предыдущий вариант этой функции вёл к deadlock (может, и не дедлок, но программа просто переставала работать через некоторое время)
В предыдущем варианте
while True:
if len(list_of_items)>=2:
q.put(list_of_items.pop())
pass
не было выхода из цикла.
Isem
Вам не кажется, что нужно почитать Кнута и подумать?
This is a "lo-fi" version of our main content. To view the full version with more information, formatting and images, please click here.
Powered by DjangoBB