Уведомления

Группа в Telegram: @pythonsu

#1 Окт. 20, 2010 18:30:47

grundic
От:
Зарегистрирован: 2010-04-25
Сообщения: 6
Репутация: +  0  -
Профиль   Отправить e-mail  

Dynamic generators

Привет!

Мне нужно работать с генераторами, что бы отдавать chunked запрос браузеру. Я делаю это так:

for line in data:
yield line
Это просто.
Теперь у меня есть некая блокирующая функция, которая что-то делает. Пока она чот-то делает, результат ее работы можно получить через callback.
Внимание, вопрос: как мне через callback функцию создать генератор?

Написал такой код, но он блокирующий:
from multiprocessing import Queue, Process
import random
from time import sleep


qq = Queue()

def fill_the_qq():
for x in xrange(10):
qq.put(x)
sleep(0.3)

def generator_func():
print("*** generator_func started ***")
while not qq.empty():
item = qq.get()
yield item
else:
print "empty :("


if __name__ == '__main__':

fill_the_qq()

for line in generator_func():
print line

raw_input("Press any key to continue")
Я пришел к идее, что блокирующую функцию можно запустить в потоке. И пока она не выполнится, в цикле считывать данные и возвращать их. Callback в данном случае будет наполнять данными некий объект.

получилось такое и оно не работает:
from multiprocessing import Queue, Process
import random
from time import sleep


qq = Queue()

def fill_the_qq():
for x in xrange(10):
print "putting {x} item to queue...".format(x=x)
qq.put(x)
sleep(0.3)

def generator_func():
print("*** generator_func started ***")
while not qq.empty():
item = qq.get()
yield item
else:
print "empty :("


if __name__ == '__main__':

p = Process(target=fill_the_qq,)
p.start()

for line in generator_func():
print line

raw_input("Press any key to continue")
Здесь я пытаюсь “накормить” генератор значениями из очереди. Но почему-то генератор возвращает None.
Возможно, кто-то сможет предложить более правильное решение.

Спасибо.



Офлайн

#2 Окт. 20, 2010 18:56:35

zheromo
От:
Зарегистрирован: 2010-10-02
Сообщения: 356
Репутация: +  2  -
Профиль   Отправить e-mail  

Dynamic generators

Очередь опустошиться раньше, чем генератор вернет значение из нее, может статься что и вначале она пуста.

Попробуйте так

def fill_the_qq():
for x in xrange(10):
print "putting {x} item to queue...".format(x=x)
qq.put(x)
sleep(0.3)
qq.put('STOP')

def generator_func():
print("*** generator_func started ***")
while True:
item = qq.get()
if item == 'STOP':
print "empty :("
break
yield item



Офлайн

#3 Окт. 20, 2010 22:43:28

grundic
От:
Зарегистрирован: 2010-04-25
Сообщения: 6
Репутация: +  0  -
Профиль   Отправить e-mail  

Dynamic generators

Большое спасибо за ответ.

Попробовал ваш вариант - почему-то зависает в самом конце :(

На самом деле мне нужно работать с pysvn. На агенте планируется экспортировать определенные папки и в ходе выполнения хочется видеть прогресс. У pysvn есть удобные callback для отображения данных в консоли, но в моем случае требуется “динамический” генератор (не знаю, есть ли такой термин).

Когда я запускаю pysvn.export - работа программы блокируется. Поэтому делаем эту операцию в потоке. Но вот как скормить данные из callback в генератор - я не знаю :( Прочитал о multiprocessing, но испробованные там варианты не работают - callback функция не хочет писать в queue, Namespace Manager. Была идея создать Connection Objects - клиент/сервер, но показалось уж очень сложным для данной задачи. В итоге остановился на точках - в процессе выполнения будет хоть какая-то активность.

import pysvn
import datetime
from time import sleep
from multiprocessing import Process, Queue, Manager

qq = Queue()

def do_export():
client = pysvn.Client()
url = "svn://vault/trunk/folder/"
print "Exporting {url}".format(url=url)
client.callback_notify = notify_export
revision = client.export(url, r'D:\tezt', force=True).number
print "\nExported at {rev}".format(rev=revision)

def notify_export(event_dict):
data = "{action} -> {path}".format(action=event_dict.get('action'), path=event_dict.get('path'))
qq.put(data)
#print data

def infinit_gena():
print " *** thread gena started ***"
p = Process(target=do_export,)
p.start()

while True:
yield '.'
sleep(0.2)
if not p.is_alive(): break


if __name__ == '__main__':

for i in infinit_gena():
print i,

print "qq is empty? => {qq}".format(qq=qq.empty())
raw_input("\nPress any key to continue")
Получаю такой output:
 *** thread gena started ***
.Exporting svn://vault/trunk/folder/
. . . . . . . . . . . . . . . . . . . . . . . . .
Exported at 6388
qq is empty? => True

Press any key to continue
Возможно, кто-то сможет помочь мне. Хочется из функции notify_export, которая вызывается в процессе экспорта передавать значения в какой-то объект - в принципе любой - что бы потом можно было вытащить данные в генераторе.



Отредактировано (Окт. 20, 2010 22:54:14)

Офлайн

#4 Окт. 21, 2010 02:25:02

py.user.next
От:
Зарегистрирован: 2010-04-29
Сообщения: 9878
Репутация: +  853  -
Профиль   Отправить e-mail  

Dynamic generators

from multiprocessing import Queue, Process
import random
from time import sleep


qq = Queue()

def fill_the_qq():
for x in xrange(10):
print "putting {x} item to queue...".format(x=x)
qq.put(x)
sleep(0.3)

def generator_func():
print("*** generator_func started ***")
while not qq.empty():
item = qq.get()
yield item
else:
print "empty :("


if __name__ == '__main__':

p = Process(target=fill_the_qq)
p.start()
p.join()

print("loop started")
for line in generator_func():
print line

raw_input("Press any key to continue")
чуть прояснил, оказалось, что у тебя процесс не успевал занести в очередь (главный уже заканчивался, а занесение только начиналось)



Офлайн

#5 Окт. 21, 2010 08:39:08

zheromo
От:
Зарегистрирован: 2010-10-02
Сообщения: 356
Репутация: +  2  -
Профиль   Отправить e-mail  

Dynamic generators

py.user.next
чуть прояснил, оказалось, что у тебя процесс не успевал занести в очередь (главный уже заканчивался, а занесение только начиналось)
Об этом выше я писал

можно либо использовать значение, которе говорит что очередь пуста (см. мой пример выше) либо таймоуты

def generator_func():    
print("*** generator_func started ***")
while True:
try:
item = qq.get(timeout)
except Empty:
print "empty :("
break
else:
yield item



Офлайн

#6 Окт. 21, 2010 08:40:44

zheromo
От:
Зарегистрирован: 2010-10-02
Сообщения: 356
Репутация: +  2  -
Профиль   Отправить e-mail  

Dynamic generators

py.user.next
чуть прояснил, оказалось, что у тебя процесс не успевал занести в очередь (главный уже заканчивался, а занесение только начиналось)
Об этом выше я и писал

zheromo
Очередь опустошиться раньше, чем генератор вернет значение из нее, может статься что и вначале она пуста.
можно либо использовать значение, которе говорит что очередь пуста (см. мой пример выше) либо таймоуты

def generator_func():    
print("*** generator_func started ***")
while True:
try:
item = qq.get(timeout)
except Empty:
print "empty :("
break
else:
yield item



Офлайн

#7 Окт. 21, 2010 09:57:16

py.user.next
От:
Зарегистрирован: 2010-04-29
Сообщения: 9878
Репутация: +  853  -
Профиль   Отправить e-mail  

Dynamic generators

zheromo
Очередь опустошиться раньше, чем генератор вернет значение из нее
нет, там как раз очередь даже не успеет заполниться, потому что главный процесс закончится к тому времени
то есть она даже не начнёт опустошаться, потому что некому будет её опустошать



Офлайн

#8 Окт. 21, 2010 18:16:19

zheromo
От:
Зарегистрирован: 2010-10-02
Сообщения: 356
Репутация: +  2  -
Профиль   Отправить e-mail  

Dynamic generators

Цитирую полностью :)

zheromo
Очередь опустошиться раньше, чем генератор вернет значение из нее, может статься что и вначале она пуста.
я имел в виду, что даже если повезет и вначале в очереди будет элемент, то она все равно опустошиться…



Офлайн

#9 Окт. 22, 2010 01:50:44

py.user.next
От:
Зарегистрирован: 2010-04-29
Сообщения: 9878
Репутация: +  853  -
Профиль   Отправить e-mail  

Dynamic generators

from multiprocessing import Queue, Process
import random
from time import sleep


qq = Queue()

def fill_the_qq():
for x in xrange(10):
print "putting {x} item to queue...".format(x=x)
qq.put(x)
sleep(0.3)

def generator_func():
print("*** generator_func started ***")
while not qq.empty():
item = qq.get()
yield item
else:
print "empty :("


if __name__ == '__main__':

p = Process(target=fill_the_qq)
p.start()
#p.join()

print("loop started")
for line in generator_func():
print line
print("loop ended")

raw_input("Press any key to continue")
[guest@localhost tests]$ python t.py
loop started
*** generator_func started ***
empty :(
loop ended
Press any key to continueputting 0 item to queue...
putting 1 item to queue...
putting 2 item to queue...
putting 3 item to queue...
putting 4 item to queue...
putting 5 item to queue...
putting 6 item to queue...
putting 7 item to queue...
putting 8 item to queue...
putting 9 item to queue...

[guest@localhost tests]$
как видно из данного вывода, цикл полностью отрабатывает к моменту занесения первого элемента в очередь

>>> def f(n):
... for i in range(1, n + 1):
... if i == 5:
... break
... yield i
...
>>> for i in f(10):
... print(i)
...
1
2
3
4
>>>
а тут видно, что восстановления цикла не происходит

у него в коде уже есть
while not qq.empty():
чем это отличается от предложенного
        if item == 'STOP':
print "empty :("
break
?

а может он вообще хочет, чтобы добавление и вынимание элементов из очереди происходили одновременно
тогда, возможно, ему нужно зациклить цикл (чтобы он повторялся, запуская генераторную функцию снова)



Офлайн

#10 Окт. 22, 2010 01:52:21

py.user.next
От:
Зарегистрирован: 2010-04-29
Сообщения: 9878
Репутация: +  853  -
Профиль   Отправить e-mail  

Dynamic generators

raw_input("Press any key to continue...\n")
а то сливается нулевой элемент



Офлайн

Board footer

Модераторировать

Powered by DjangoBB

Lo-Fi Version