Требуется проверка: класс Clearable Python Queue - PullRequest
2 голосов
/ 18 марта 2011

Поскольку я не специалист по Python и многопоточному программированию, я хотел бы спросить вас, правильна ли моя реализация.

Моя цель состояла в том, чтобы расширить класс Queue, чтобы его можно было очистить.И удаленные предметы должны быть возвращены.Это все.Моя реализация:

import Queue

class ClearableQueue(Queue.Queue):

    def __init__(self, maxsize):
        Queue.Queue.__init__(self, maxsize)

    def clear(self):
        self.mutex.acquire()

        copyOfRemovedEntries = list(self.queue)
        self.queue.clear()
        self.unfinished_tasks = 0
        self.all_tasks_done.notifyAll()
        self.not_full.notifyAll()

        self.mutex.release()

        return copyOfRemovedEntries

Это правильно?Спасибо.

Обновление: к сожалению, эта реализация по-прежнему недостаточна, так как task_done может вызвать исключение ValueError после вызова clear ().

Точнее: считается, что очередь используется вмногопоточная среда.Поэтому предположим, что один производитель и один рабочий поток (но вы также можете рассмотреть больше потоков).Обычно, если рабочий поток вызывает get (), после вызова работы должен вызываться task_done ().Если это происходит таким образом, то может случиться так, что поток производителя по какой-то причине вызовет clear (), сразу после того, как рабочий поток вызвал get () и до вызова task_done ().Однако пока это работает, если рабочий поток хотел бы вызвать task_done (), тогда будет сгенерировано исключение.Это связано с тем, что task_done () проверяет количество незавершенных задач, проверяя unfinished_tasks класса Queue.

Было бы интересно, если бы эта проблема могла быть решена только классом ClearableQueue, чтобы можно было вызывать метод clear ()без забот.Или, если должно быть что-то другое, что каким-то образом контролирует вызовы метода.

На самом деле, в моем конкретном случае я не использую метод join (), поэтому мне не нужно вызывать task_done ().Тем не менее, я хотел бы сделать эту функцию полной.Может быть полезно и другим людям.

Ответы [ 2 ]

3 голосов
/ 18 марта 2011

Если вы посмотрите на источник , вы увидите, что стандартный способ доступа к мьютексу обертывает мутирующий код в блок try: finally на случай, если что-то пойдет не так:

import Queue

class ClearableQueue(Queue.Queue):

    def __init__(self, maxsize):
        Queue.Queue.__init__(self, maxsize)

    def clear(self):
        self.mutex.acquire()

        copyOfRemovedEntries = None
        try:
            copyOfRemovedEntries = list(self.queue)
            self.queue.clear()
            self.unfinished_tasks = 0
            self.all_tasks_done.notifyAll()
            self.not_full.notifyAll()
        finally:
            self.mutex.release()

        return copyOfRemovedEntries

Редактировать 1

Если вы беспокоитесь об исключениях броска второго потока при выполнении get(), тогда task_done() почему бы просто не обернуть task_done() в блок try-catch? Все, что говорит вам об этом исключении, это то, что вы подтвердили, что вы подтвердили слишком много элементов, но если ваша функция очистки уже позаботилась о них, в чем проблема?

Это скрыло бы это исключение, если оно вас беспокоило, сделало назначение функций более очевидным и удалило двойное назначение списка в моем предыдущем примере:

class ClearableQueue(Queue.Queue):

    def __init__(self, maxsize):
        Queue.Queue.__init__(self, maxsize)

    def get_all(self)
        self.mutex.acquire()

        try:
            copyOfRemovedEntries = list(self.queue)
            self.queue.clear()
            self.unfinished_tasks = 0
            self.all_tasks_done.notifyAll()
            self.not_full.notifyAll()
        finally:
            self.mutex.release()

        return copyOfRemovedEntries

    def clear(self):
        self.get_all()

    def task_done(self):
        try:
            Queue.Queue.task_done(self)
        except ValueError:
            pass

Редактировать 2

Как насчет этого как еще более эффективного решения, которое ничего не скрывает:

class ClearableQueue(Queue.Queue):

    def __init__(self, maxsize):
        Queue.Queue.__init__(self, maxsize)
        self.tasks_cleared = 0

    def get_all(self)
        self.mutex.acquire()

        try:
            copyOfRemovedEntries = list(self.queue)
            self.queue.clear()
            self.unfinished_tasks = 0
            self.all_tasks_done.notifyAll()
            self.not_full.notifyAll()
            self.tasks_cleared += len(copyOfRemovedEntries)
        finally:
            self.mutex.release()

        return copyOfRemovedEntries

    def clear(self):
        self.get_all()

    def task_done(self):
        self.all_tasks_done.acquire()
        try:
            unfinished = self.unfinished_tasks + self.tasks_cleared - 1
            if unfinished <= 0:
                if unfinished < 0:
                    raise ValueError('task_done() called too many times')
                self.all_tasks_done.notify_all()
            self.unfinished_tasks = unfinished - self.tasks_cleared
            self.tasks_cleared = 0
        finally:
            self.all_tasks_done.release() 

Я думаю, что этого следует избегать исключения, но при этом вести себя так, как ожидалось в исходном классе.

1 голос
/ 18 марта 2011

Вы, похоже, страдаете от какого-то состояния расы, и, если я понимаю, текущая ситуация такова, что вы иногда получаете:

T1: |----->|------------->|-------------->|
    | get  |    some_opp  | task_done     |
T2: |---------->|------>|---------------->|
    | other_opp | clear | yet_another_opp |

Если очистка выполняется в пределах get и task_done. Это вызывает сбой. Насколько я понимаю, вам нужно каким-то образом сделать это:

T1: |----->|------------->|-------------->|
    | get  |    some_opp  | task_done     |
T2: |---------->|------------------------>|------>|
    | other_opp | wait_for_task_done      | clear |

Если это правильно, вам может понадобиться вторая блокировка, установленная get и отпущенная task_done, которая говорит, что «эта очередь не может быть очищена». Тогда вам может понадобиться версия get и task_done, которая не делает этого для особых случаев, когда вы действительно знаете, что вы делаете .

Альтернативой этому является более атомная блокировка, которая позволяет вам сделать это:

T1: |----->|------------------->|-------------->|------------->|
    | get  |    some_opp        | task_done     | finish_clear |
T2: |---------->|-------------->|---------------->|
    | other_opp | partial_clear | yet_another_opp |

Когда вы говорите: «Я еще не закончил эту задачу, но вы можете очистить остальное, а затем сообщает task_done, что задача была предпринята для очистки, поэтому она должна что-то делать после. Это начинает становиться довольно сложным, хотя.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...