Найти - Пользователи
Полная версия: Многопоточные сетевые приложения замедляются после промежутка времени
Начало » Python для экспертов » Многопоточные сетевые приложения замедляются после промежутка времени
1 2 3
deye
Здравствуйте.

Уже не раз сталкиваюсь со следующей проблемой:

Многопоточное приложение, которое работает с сетью (проявлялось как при работе с http (для работы использовал фреймворк grab), так и с smtp (использовался smtplib)) через определённый промежуток времени (обычно этот промежуток времени примерно одинаковый, около пару часов) начинает замедляться.

Пример:
Приложение многопоточно заходит на сайт, заполняет форму, сабмитит её и получает результат.
Архитектура проста: есть очередь (Queue) куда изначально закидываются сразу все задания(данные для формы), потоки забирают задания из очереди и пробуют выполнить его. Если выполнилось - записываем результат и переходим к следующему заданию, иначе (например возникла какая-либо сетевая ошибка, таймаут и прочее) - закидываем задание заново в очередь.

Первые пару часов всё работает идеально, но после скорость работы падает примерно раз в 10. При этом никаких несловленных исключений не выпадает, логи пишутся, всё работает в штатном режиме.

Провералось на ОС Windows. Версия pyhon - 2.7.

Пробовал следующие:
1) Увеличивал количесто полу-открытых соединений Windows
2) Пробовал использовать прокси
3) Увеличивал количество возможных открытых портов Windows
4) Убедился (используя сокет сниффер), что сокеты продолжают успешно открываться после того как скорость программы падает.

Замечено что перезапуск приложения решает проблему.

В чём может быть дело? Куда копать?
Lexander
Память в течении часа постоянно растет?
Возможно, только до определенного уровня и потом рост замедляется или останавливается.
Если, смотрите в сторону утечек памяти, перенастройте GC под ваше приложение, принудительно убивайте все объекты или используйте weakref.
plusplus
А многопоточность не с помощью grab.tools.work организована? Попробуй с ней, может проблемы пропадут.
lorien
Не совсем понятно как организованы потоки. Есть две версии причин замедления:
1) Создаётся пул потоков, со временем часть потоков отваливается по фатальному эксепшну. Это легко выясняется заворачиванием всего кода внутри потока в глобальный try/except и записью объекта исключения в лог.
2) В некоторых потоках возникает операция зависания, например, при использовании urllib и невыставленном timeout может возникнуть такой случай, что попытка соеденения будет длиться вечно. Но если вы используете Grab, то там должен network timeout exception выскакивать.

В любом случае проблема решается подробным логированием, логировать надо всё, особенно важно номер потока и входные параметры задания. Если что-то зависает или падает, будет видно, посе какого задания в логах пропали сообщения от потока.

Ну и конечно могут быть ещё куча разных причин, вплоть до медленного хранилища, куда вы пишите данные.
deye
Lexander, пробовал дебажить с опцией gc.DEBUG_UNCOLLECTABLE - никаких объектов не показало. Как я понимаю, это значит что сборщик мусора без проблем очищает объекты.

plusplus, пока не пробовал, но код make_work реализован схоже с моим изначальным

lorien, как я писал выше, никаких несловленных исключений не возникает. я логировал работу каждого потока и, после того момента как программа начинает замедляться, логи всех потоков по прежнему ведутся, а следовательно потоки работают, в логах необработанных исключений не возникает, дедлоков нигде не обнаружил. В качестве сетевой библиотеки использую grab, поэтому бесконечный таймаут не возникает. Насчёт медленного хранилища - результаты поток записывает сразу в .txt файл.
Lexander
deye
Lexander, пробовал дебажить с опцией gc.DEBUG_UNCOLLECTABLE - никаких объектов не показало. Как я понимаю, это значит что сборщик мусора без проблем очищает объекты.
Если вашу попытку дебажить считать положительным ответом на мой вопрос о росте памяти, то как проблемы есть.
gc.DEBUG_SAVEALL включен?

А вообще, без хоть какого-либо кода и результата вывода дальше трудно и бесполезно подсказывать.
Lexander
Вы потоки убиваете и создаете заново или они всегда существуют?
deye
gc.DEBUG_SAVEALL не пробовал. Насчёт памяти да, постоянный рост есть.
Код, к сожалению, не могу выложить в общий доступ.
Потоки создаются единожды и существуют всё время, до тех пор пока все задания не выполнятся.
deye
Организована работа потоков следующим образом (упрощённый пример):
# заполняем очередь заданий
for task in tasks:
    tasks_queue.put(task)
# пишем функцию, которую будет выполнять каждый поток
def worker():
    while True:
        try:
            task = tasks_queue.get_nowait()
        except Queue.Empty:
            break
        result = process_task(task) # Функция process_task выполняет
                                    # несколько сетевых запросов.
                                    # Все исключения ловятся
        if result_is_good(result):
            # если работа выполнилась успешно - записываем результат
            LOCK.aquire()
            with open('results.txt','a') as out:
                out.write(result)
            LOCK.release()
        else:
            # если выпал эксепшн - закидываем задачу обратно в очередь
            tasks_queue.put(task)
# запускаем потоки
for i in range(threads_qty):
    thread = threading.Thread(target=worker)
    thread.start()
            
        
Lexander
С локами у вас потенциальный конфликт.
Если захват не произошел, то как поведет себя следующий код?

Освобождение захвата применяется к последнему локу, выставленному из любого потока.

Рекомендую переписать этот участок кода.
Сделайте очередь результатов, из потоков результат сохраняйте в очередь и пусть отдельный поток эксклюзивно работает с файлом. Локи не понадобятся.

Добавьте анализ состояния потоков с помощью threading.settrace().

deye
# Все исключения ловятся
Исходя из кода я все таки спрошу:
Вы уверены в этом?
И что правильно ловятся, с уничтожением объектов?
This is a "lo-fi" version of our main content. To view the full version with more information, formatting and images, please click here.
Powered by DjangoBB