Python Thread / Queue проблема - PullRequest
       10

Python Thread / Queue проблема

2 голосов
/ 19 февраля 2010

Я создаю многопоточный скрипт на python, в котором есть коллекция файлов, помещаемых в очередь, а затем неизвестное количество потоков (по умолчанию 3) для начала загрузки. Когда каждый из потоков завершается, он обновляет стандартный вывод со статусом очереди и процентом. Все файлы загружаются, но информация о состоянии неверна в 3-м потоке, и я не уверен, почему. Я подумывал о создании очереди work_completed для использования в расчете, но не думаю, что мне это нужно / что это будет иметь значение. Может ли кто-нибудь указать мне правильное направление здесь?

download_queue = queue.Queue()

class Downloader(threading.Thread):
    def __init__(self,work_queue):
        super().__init__()
        self.current_job = 0
        self.work_queue = work_queue
        self.queue_size = work_queue.qsize()

    def run(self):
        while self.work_queue.qsize() > 0:
            url = self.work_queue.get(True)
            system_call = "wget -nc -q {0} -O {1}".format(url,local_file)
            os.system(system_call)
            self.current_job = int(self.queue_size) - int(self.work_queue.qsize())
            self.percent = (self.current_job / self.queue_size) * 100
            sys.stdout.flush()
            status = "\rDownloading " + url.split('/')[-1] + " [status: " + str(self.current_job) + "/" + str(self.queue_size) + ", " + str(round(self.percent,2)) + "%]"
        finally:
            self.work_queue.task_done()
def main:
    if download_queue.qsize() > 0:
        if options.active_downloads:
            active_downloads = options.active_downloads
        else:
            active_downloads = 3
        for x in range(active_downloads):
            downloader = Downloader(download_queue)
            downloader.start()
        download_queue.join()

Ответы [ 2 ]

4 голосов
/ 20 февраля 2010

Вы не можете проверить размер очереди в одном операторе, а затем .get() из очереди в следующем. Тем временем весь мир мог измениться. Вызов метода .get() - это единственная атомарная операция, которую вам нужно вызвать. Если он поднимает Empty или блокирует, очередь пуста.

Ваши темы могут перезаписывать вывод друг друга. Я хотел бы иметь другой поток с входной очередью, единственной задачей которого является печать элементов в очереди на стандартный вывод. Он также может отсчитывать количество выполненных предметов и получать информацию о состоянии.

Я также склоняюсь не к подклассу Thread, а вместо этого просто предоставляю простой Thread экземпляр с параметром target= и .start() поток.

на основе вашего ответа, попробуйте это:

download_queue = queue.Queue()


class Downloader(threading.Thread):
    def __init__(self,work_queue, original_size):
        super().__init__()
        self.current_job = 0
        self.work_queue = work_queue
        self.queue_size = original_size

    def run(self):
        while True:
            try:
                url = self.work_queue.get(False)
                system_call = "wget -nc -q {0} -O {1}".format(url,local_file)
                os.system(system_call)
                # the following code is questionable. By the time we get here,
                #   many other items may have been taken off the queue. 
                self.current_job = int(self.queue_size) - int(self.work_queue.qsize())
                self.percent = (self.current_job / self.queue_size) * 100
                sys.stdout.flush()
                status = ("\rDownloading " + url.split('/')[-1] + 
                          " [status: " + str(self.current_job) + 
                          "/" + str(self.queue_size) + ", " + 
                          str(round(self.percent,2)) + "%]" )            
            except queue.Empty:
                pass
            finally: 
                self.work_queue.task_done()




def main:
    if download_queue.qsize() > 0:
        original_size = download_queue.qsize()
        if options.active_downloads:
            active_downloads = options.active_downloads
        else:
            active_downloads = 3
        for x in range(active_downloads):
            downloader = Downloader(download_queue, original_size)
            downloader.start()
        download_queue.join()
2 голосов
/ 20 февраля 2010

Если вы хотите использовать модуль multiprocessing, он включает очень приятную параллель imap_unordered, которая уменьшит вашу проблему до очень элегантной:

import multiprocessing, sys

class ParallelDownload:
    def __init__(self, urls, processcount=3):
        self.total_items = len(urls)
        self.pool = multiprocessing.Pool(processcount)
        for n, status in enumerate(self.pool.imap_unordered(self.download, urls)):
            stats = (n, self.total_items, n/self.total_items)
            sys.stdout.write(status + " [%d/%d = %0.2f %%]\n"%stats)


    def download(self, url):
        system_call = "wget -nc -q {0} -O {1}".format(url, local_file)
        os.system(system_call)
        status = "\rDownloaded " + url.split('/')[-1]
        return status
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...