Форум сайта python.su
Есть такой код.
main.py
class PrWorker(multiprocessing.Process): def __init__(self, queue_in, queue_out): super(PrWorker, self).__init__() self.__queue_in = queue_in self.__queue_out = queue_out def run(self): while True: task = self.__queue_in.get() payl = Main4() result = payl.mainStart() try: for i in result: if i is not 'Error': self.__queue_out.put(i) self.__queue_in.task_done() else: self.__queue_out.put('Error') self.__queue_in.task_done() except Exception: self.__queue_out.put('Error') self.__queue_in.task_done() if __name__ =='__main__': allProc = [] q = queue.Queue() result = queue.Queue() for i in range(2): q.put(i) for i in range(1): print(i) rp = PrWorker(q, result) rp.start() allProc.append(rp) for p in allProc: p.join() out = [] while not result.empty(): out.append(result.get()) print('Final') print(out)
import queue import requests import threading class Worker(threading.Thread): def __init__(self, queue_in, queue_out): super(Worker, self).__init__() self.setDaemon(True) self.__queue_in = queue_in self.__queue_out = queue_out def run(self): """ Основной код здесь. """ while True: # Получаем задание из входящей очереди url = self.__queue_in.get() try: res = requests.get(url, timeout=1).status_code print(url + ' : ' + str(res)) if 'http://google.com' in url: self.__queue_out.put({'url': url, 'status_code': res}) self.__queue_in.task_done() else: self.__queue_in.task_done() except Exception: self.__queue_out.put('Error') self.__queue_in.task_done() class Main4(object): def buildSitesList(self): f = open('sites.txt', 'r').readlines() sites = [] for s in f: sites.append(s.rstrip()) return list(set(sites)) def mainStart(self): q = queue.Queue() # Создаем результирующую приоритетную очередь result = queue.Queue() # Заполняем входящую очередь данными for i in self.buildSitesList(): q.put(i) # Создаем и запускаем потоки for i in range(20): w = Worker(q, result) w.start() # Блокируем дальенейшее выполнение программы до тех пор пока потоки не обслужат все эелементы очереди q.join() # Формируем список как результат обработки изначального списка out = [] while not result.empty(): out.append(result.get()) print(out)
Отредактировано Mr.Anderson (Июль 28, 2017 03:32:13)
Офлайн
Добавьте логирование во все циклы “while”, скорее всего проблема именно там.
Офлайн
Mr.Anderson
полагаю, что когда в тред-воркере делается
url = self.__queue_in.get()
try: url = self.__queue_in.get(timeout=1) except Empty: # какой нибудь финалазер return
Офлайн