Форум сайта python.su
Есть такой эвент луп:
if __name__ == '__main__': loop = asyncio.get_event_loop() # taskqueue = asyncio.Queue(maxsize=100) loop.run_until_complete( asyncio.gather( crawler.proto_authors_source(loop), crawler.fetcher_task(), crawler.parser_task(), crawler.dbwriter_task(loop), crawler.timeout_control(loop) ) )
async def timeout_control(loop, timeout=20, check_period=10): logger.info("TIMEOUT CONTROL UP") while True: await asyncio.sleep(check_period) if taskq.empty() and parserq.empty() and dbwq.empty() and _source_end: # taskq и прочие - очереди проброшенные между корутинами, _source_end - флаг завершения корутины - источника данных logger.info("TIMEOUT CONTROL | waiting for completion...") await loop.shutdown_asyncgens() await asyncio.sleep(timeout) logger.info("TIMEOUT CONTROL | close event loop") loop.stop() break logger.info("TIMEOUT CONTROL DOWN")
INFO:crawler:DBWRITER UP
INFO:crawler:TIMEOUT CONTROL UP
INFO:crawler:PARSER UP
INFO:crawler:FETCHER UP
INFO:crawler:SOURCE AuthorsList UP
INFO:crawler:SOURCE DOWN
INFO:crawler:TIMEOUT CONTROL | waiting for completion...
INFO:crawler:TIMEOUT CONTROL | close event loop
INFO:crawler:TIMEOUT CONTROL DOWN
Traceback (most recent call last):
File "D:/google-drive/Dropbox/samlibcrawler/run.py", line 21, in <module>
crawler.timeout_control(loop)
File "C:\Program Files\Python36\Lib\asyncio\base_events.py", line 465, in run_until_complete
raise RuntimeError('Event loop stopped before Future completed.')
RuntimeError: Event loop stopped before Future completed.
Офлайн
kazaffСмысл асинхронности в том что вы гарантируете отсутствие зависаний в обработчиках (это просто необходимо) и управляемость всех примитивов синхронизации. Если это не так то вы выбрали не ту архитектуру и добро пожаловать назад в вытесняющую многозадачность с тредами.
Иногда случается, что данных от proto_authors_source нет
Офлайн
Я использую флаги для синхронизации.
https://pastebin.com/wWtA8b6i
Но обычно такие проблемы вызваны неправильно придуманной логикой программы.Согласен, но я не могу придумать как сделать так, чтобы обойтись без:
_link, html = await parserq.get()
if taskq.empty(): await asyncio.sleep(timeout) if taskq.empty(): return
https://docs.python.org/3/library/asyncio-dev.html#cancellationМда, мудрость из веков - RTFM. Вроде подходит.
Отредактировано kazaff (Янв. 21, 2018 11:23:40)
Офлайн
может стоит тут изложить архитектуру приложения. Допускаю что вам предложат более красивое решение. По отдельным фрагментам ничего не посоветуешь.
Офлайн
Все приложение - банальный парсер самиздата. Из заранее напарсенной бд с линками приложение берет сслыки, скачивает страницу, затем проверяет счетчики на странице и заносит в бд. Все.
Всего есть 4 корутины:
источник данных (
берет данные из mariadb,
получает по одному адресу из курсора вызывая fetchone(),
кладет данные в очередь;
после завершения флаг _source_end становится True
)
fetcher (
в бесконечном цикле ждет:
данные из очереди источника
скачивает и передает в очередь для парсера
если очередь пуста и флаг _source_end поднят,
то выйти и поднять свой флаг завершения _fetcher_end
)
парсер (
так же в бесконечном цикле:
ждет данные из очереди от фетчера
парсит и полученные данные упакованные в кортеж передает следующей очереди
если очередь пуста и флаг _fetcher_end поднят,
то выйти и поднять свой флаг завершения _parser_end
)
база данных (
так же в бесконечном цикле:
ждет данные из очереди от парсера
формирует SQL-запрос и обращается к бд
если очередь пуста и флаг _parser_end поднят,
то выйти
)
Как именно парсер работает особого значения не имеет, все более чем примитивно.
Запускается все одновременно с помощью gather, в loop.run_until_complete.
Я вижу, что если немного модифицировать получение данных из очередей или добавить таймаут, то зависания при отсутствии данных прекратятся, но может можно как-то более цивильно и проще все соединить?
Отредактировано kazaff (Янв. 21, 2018 17:16:04)
Офлайн