Использование класса Queue в Python 2.6 - PullRequest
2 голосов
/ 16 апреля 2010

Давайте предположим, что я застрял на Python 2.6 и не могу обновить (даже если это поможет). Я написал программу, которая использует класс Queue. Мой продюсер - это простой каталог. Мои потребительские потоки извлекают файл из очереди и делают с ним что-нибудь. Если файл уже обработан, я пропускаю его. Обработанный список генерируется до запуска всех потоков, поэтому он не пустой.

Вот некоторый псевдокод.

import Queue, sys, threading

processed = []

def consumer():
    while True:
        file = dirlist.get(block=True)
        if file in processed:
            print "Ignoring %s" % file
        else:
            # do stuff here
        dirlist.task_done()

dirlist = Queue.Queue()

for f in os.listdir("/some/dir"):
    dirlist.put(f)

max_threads = 8

for i in range(max_threads):
    thr = Thread(target=consumer)
    thr.start()

dirlist.join()

Странное поведение, которое я получаю, заключается в том, что если поток встречает файл, который уже был обработан, поток останавливается и ожидает завершения всей программы. Я провел небольшое тестирование, и первые 7 потоков (при условии, что 8 - максимум) останавливаются, а 8-й поток продолжает обрабатывать по одному файлу за раз. Но, делая это, я теряю все основания для создания многопоточности приложения.

Я что-то не так делаю, или это ожидаемое поведение классов Queue / threading в Python 2.6?

Ответы [ 2 ]

2 голосов
/ 16 апреля 2010

Я попытался запустить ваш код и не увидел описанного вами поведения. Тем не менее, программа никогда не выходит. Я рекомендую изменить вызов .get() следующим образом:

    try:
        file = dirlist.get(True, 1)
    except Queue.Empty:
        return

Если вы хотите узнать, какой поток выполняется в данный момент, вы можете импортировать модуль thread и напечатать thread.get_ident () .

Я добавил следующую строку после .get():

    print file, thread.get_ident()

и получил следующий вывод:

bin 7116328
cygdrive 7116328
 cygwin.bat 7149424
cygwin.ico 7116328
 dev etc7598568
7149424
 fix 7331000
 home 7116328lib
 7598568sbin
 7149424Thumbs.db
 7331000
tmp 7107008
 usr 7116328
var 7598568proc
 7441800

Вывод грязный, потому что потоки пишут в стандартный вывод одновременно. Разнообразие идентификаторов потоков также подтверждает, что все потоки работают.

Возможно, что-то не так в реальном коде или в вашей методологии тестирования, но не в коде, который вы опубликовали?

1 голос
/ 16 апреля 2010

Поскольку эта проблема проявляется только при поиске файла, который уже был обработан, кажется, что это как-то связано с самим списком processed. Вы пытались реализовать простую блокировку? Например:

processed = []
processed_lock = threading.Lock()

def consumer():
    while True:
        with processed_lock.acquire():
            fileInList = file in processed
        if fileInList:
            # ... et cetera

Потоки имеют тенденцию вызывать самые странные ошибки, даже если они кажутся, что они "не должны" происходить. Использование блокировок для общих переменных - это первый шаг, чтобы убедиться, что вы не столкнетесь с какими-то условиями гонки, которые могут привести к взаимоблокировке потоков.


Конечно, если то, что вы делаете в # do stuff here, сильно загружает процессор, то Python все равно будет запускать код только из одного потока за раз из-за Глобальной блокировки интерпретатора. В этом случае вы можете переключиться на модуль multiprocessing - он очень похож на threading, хотя вам придется заменить общие переменные на другое решение (подробности см. здесь ).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...