Форум сайта python.su
Всем добрый вечер!
У меня возник вот такой вопрос: занимаюсь написанием программы, которая делает некоторые файловые операции на компьютере с помощью
тредов.
Обобщённый код описанной ситуации:
...
q = Queue() # создание очереди как объекта, к которому можно обращаться в Питоне
'''
здесь некоторый код, который строит очередь, например,
мы распарсили файловые имена папок с помощью re и os в список list,
а дальше queue строится добавлением элементов списка через цикл
'''
for item in list:
q.put(item)
'''
создание очереди закончено, хотим теперь в несколько потоков (например, 10) обработать очередь c помощью "Работников"
'''
def worker(q):
while True:
a = q.get()
print a
q.task_done()
for i in xrange(10):
t = Thread(target=worker, args=(q,))
t.daemon = True
t.start()
'''
ждем, пока очередь не опустеет
'''
q.join()
Офлайн
Заполняйте очередь асинхронно, т.е. тоже в отдельном треде параллельно с обработкой элементов.
Офлайн
Заполняйте очередь асинхронно, т.е. тоже в отдельном треде параллельно с обработкой элементов.Правильно ли я понимаю, что это отдалённо напоминает подход producer / consumer?
Офлайн
SpectralМожно сказать что это он. По сути он уже у вас реализован, только в синхронном виде.
Правильно ли я понимаю, что это отдалённо напоминает подход producer / consumer?
Офлайн
Ок. Вот до чего я додумался, благодаря Вашей подсказке. Именно это вы имели в виду?
'''
простой пример для понимания - есть список на 1000 чисел,
пусть печать каждого числа и будет задачей для потока
'''
list = range(0,1000)
#print list
from Queue import Queue
from threading import Thread
q = Queue()
def worker_consume(q):
'''
'''
while True:
print q.get()
q.task_done()
def worker_produce(q,list_of_items):
while True:
if len(list_of_items)>=2:
q.put(list_of_items.pop())
pass
for i in xrange(1,4):
if i % 2 ==0:
t = Thread(target=worker_produce, args=(q, list))
t.setDaemon(True)
t.start()
else:
t = Thread(target=worker_consume, args=(q,))
t.setDaemon(True)
t.start()
q.join()
Отредактировано (Янв. 9, 2012 22:31:33)
Офлайн
SpectralПохоже
Именно это вы имели в виду?
Офлайн
Поправил ошибки из предыдущего кода, сделал, как у меня реализовано в программе:
'''
простой пример для понимания - есть список на 1000 чисел,
пусть печать каждого числа и будет задачей для потока
'''
list = range(0,1000)
#print list
from Queue import Queue
from threading import Thread
q = Queue()
def worker_consume(q):
'''
'''
while True:
'''
обращение get() к q должен быть однократным
поэтому если в качестве q загружается список или тапл, необходимо делать локальную копию объекта
и обращаться по индексам к ней.
в противном случае поток будет выхватывать по несколько заданий из очереди и , как показала практика, программа работает непредсказуемо
'''
q.get()
q.task_done()
def worker_produce(q,list_of_items):
'''
предыдущий вариант этой функции вёл к deadlock (может, и не дедлок, но программа просто переставала работать через некоторое время)
'''
while len(list_of_items)>0:
q.put(list_of_items.pop())
'''
действительно, поток формирования очереди "умирает", как только очередь закончена, иначе - висит
'''
for i in xrange(1,4):
if i % 2 ==0:
t = Thread(target=worker_produce, args=(q, list))
t.setDaemon(True)
t.start()
else:
t = Thread(target=worker_consume, args=(q,))
t.setDaemon(True)
t.start()
q.join()
Отредактировано (Янв. 11, 2012 13:09:19)
Офлайн
SpectralЯ всё-таки Саша.
Спасибо Андрею Кошелеву за конструктивный комментарий.
Офлайн
предыдущий вариант этой функции вёл к deadlock (может, и не дедлок, но программа просто переставала работать через некоторое время)В предыдущем варианте
while True:
if len(list_of_items)>=2:
q.put(list_of_items.pop())
pass
Офлайн
Вам не кажется, что нужно почитать Кнута и подумать?
Офлайн