Уведомления

Группа в Telegram: @pythonsu

#1 Янв. 20, 2018 19:56:26

kazaff
От:
Зарегистрирован: 2011-08-12
Сообщения: 26
Репутация: +  2  -
Профиль   Отправить e-mail  

Корректно завершить event loop

Есть такой эвент луп:

 if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # taskqueue = asyncio.Queue(maxsize=100)
    loop.run_until_complete(
        asyncio.gather(
            crawler.proto_authors_source(loop),
            crawler.fetcher_task(),
            crawler.parser_task(),
            crawler.dbwriter_task(loop),
            crawler.timeout_control(loop)
        )
    )
Корутины proto_authors_source(loop) fetcher_task() parser_task() dbwriter_task(loop) общаются через очереди. Иногда случается, что данных от proto_authors_source нет и парсер, фетчер и бд навсегда await данные из очереди. Чтобы избежать зависаний я свелосипедил такую корутину:
 async def timeout_control(loop, timeout=20, check_period=10):
    logger.info("TIMEOUT CONTROL UP")
    while True:
        await asyncio.sleep(check_period)
        if taskq.empty() and parserq.empty() and dbwq.empty() and _source_end:  
        # taskq и прочие - очереди проброшенные между корутинами, _source_end - флаг завершения корутины - источника данных
            logger.info("TIMEOUT CONTROL | waiting for completion...")
            await loop.shutdown_asyncgens()
            await asyncio.sleep(timeout)
            logger.info("TIMEOUT CONTROL | close event loop")
            loop.stop()
            break
    logger.info("TIMEOUT CONTROL DOWN")

Велосипед даже едет, но подпрыгивает на этом месте.
INFO:crawler:DBWRITER UP
INFO:crawler:TIMEOUT CONTROL UP
INFO:crawler:PARSER UP
INFO:crawler:FETCHER UP
INFO:crawler:SOURCE AuthorsList UP
INFO:crawler:SOURCE DOWN
INFO:crawler:TIMEOUT CONTROL | waiting for completion...
INFO:crawler:TIMEOUT CONTROL | close event loop
INFO:crawler:TIMEOUT CONTROL DOWN
Traceback (most recent call last):
File "D:/google-drive/Dropbox/samlibcrawler/run.py", line 21, in <module>
crawler.timeout_control(loop)
File "C:\Program Files\Python36\Lib\asyncio\base_events.py", line 465, in run_until_complete
raise RuntimeError('Event loop stopped before Future completed.')
RuntimeError: Event loop stopped before Future completed.

Все-таки, как верно завершать эвент луп?



Офлайн

#2 Янв. 21, 2018 08:36:25

doza_and
От:
Зарегистрирован: 2010-08-15
Сообщения: 4138
Репутация: +  252  -
Профиль   Отправить e-mail  

Корректно завершить event loop

kazaff
Иногда случается, что данных от proto_authors_source нет
Смысл асинхронности в том что вы гарантируете отсутствие зависаний в обработчиках (это просто необходимо) и управляемость всех примитивов синхронизации. Если это не так то вы выбрали не ту архитектуру и добро пожаловать назад в вытесняющую многозадачность с тредами.

Думаю у вас все не так печально. Один из способов это отмена задач
https://docs.python.org/3/library/asyncio-dev.html#cancellation
https://stackoverflow.com/questions/42231161/asyncio-gather-vs-asyncio-wait
Второй способ - Ставить задачи получения данных с timeout.
Третий сделать так чтобы получатели данных ждали специальное событие -или данные или ваш сигнал об окончании работы. Возбуждаете сигнал - все завершается.

Но обычно такие проблемы вызваны неправильно придуманной логикой программы.





Офлайн

#3 Янв. 21, 2018 11:22:56

kazaff
От:
Зарегистрирован: 2011-08-12
Сообщения: 26
Репутация: +  2  -
Профиль   Отправить e-mail  

Корректно завершить event loop

Я использую флаги для синхронизации.
https://pastebin.com/wWtA8b6i

Но обычно такие проблемы вызваны неправильно придуманной логикой программы.
Согласен, но я не могу придумать как сделать так, чтобы обойтись без:
  _link, html = await parserq.get()
такого ожидания данных из очереди. Если данные есть, то все корректно отрабатывает, если нет, то вечно ожидает.
Еще можно сообразить что-либо вроде:
 if taskq.empty():
    await asyncio.sleep(timeout)
    if taskq.empty():
        return
Не могу отделать от мысли, что строю велосипед из сломанных костылей.

https://docs.python.org/3/library/asyncio-dev.html#cancellation
Мда, мудрость из веков - RTFM. Вроде подходит.



Отредактировано kazaff (Янв. 21, 2018 11:23:40)

Офлайн

#4 Янв. 21, 2018 15:13:08

doza_and
От:
Зарегистрирован: 2010-08-15
Сообщения: 4138
Репутация: +  252  -
Профиль   Отправить e-mail  

Корректно завершить event loop

может стоит тут изложить архитектуру приложения. Допускаю что вам предложат более красивое решение. По отдельным фрагментам ничего не посоветуешь.



Офлайн

#5 Янв. 21, 2018 17:13:24

kazaff
От:
Зарегистрирован: 2011-08-12
Сообщения: 26
Репутация: +  2  -
Профиль   Отправить e-mail  

Корректно завершить event loop

Все приложение - банальный парсер самиздата. Из заранее напарсенной бд с линками приложение берет сслыки, скачивает страницу, затем проверяет счетчики на странице и заносит в бд. Все.

Всего есть 4 корутины:
источник данных (
берет данные из mariadb,
получает по одному адресу из курсора вызывая fetchone(),
кладет данные в очередь;
после завершения флаг _source_end становится True
)
fetcher (
в бесконечном цикле ждет:
данные из очереди источника
скачивает и передает в очередь для парсера
если очередь пуста и флаг _source_end поднят,
то выйти и поднять свой флаг завершения _fetcher_end
)
парсер (
так же в бесконечном цикле:
ждет данные из очереди от фетчера
парсит и полученные данные упакованные в кортеж передает следующей очереди
если очередь пуста и флаг _fetcher_end поднят,
то выйти и поднять свой флаг завершения _parser_end
)
база данных (
так же в бесконечном цикле:
ждет данные из очереди от парсера
формирует SQL-запрос и обращается к бд
если очередь пуста и флаг _parser_end поднят,
то выйти

)

Как именно парсер работает особого значения не имеет, все более чем примитивно.
Запускается все одновременно с помощью gather, в loop.run_until_complete.

Я вижу, что если немного модифицировать получение данных из очередей или добавить таймаут, то зависания при отсутствии данных прекратятся, но может можно как-то более цивильно и проще все соединить?



Отредактировано kazaff (Янв. 21, 2018 17:16:04)

Офлайн

Board footer

Модераторировать

Powered by DjangoBB

Lo-Fi Version