Является ли этот подход без блокировок производителем-потребителем Python поточно-ориентированным? - PullRequest
6 голосов
/ 13 мая 2009

Я недавно написал программу, в которой использовался простой шаблон производитель / потребитель. Изначально в ней была ошибка, связанная с неправильным использованием потоков. Блокировка, которую я в итоге исправил. Но это заставило меня задуматься о том, можно ли реализовать шаблон производителя / потребителя без блокировки.

Требования в моем случае были просты:

  • Одна ветка производителя.
  • Одна потребительская нить.
  • В очереди есть место только для одного элемента.
  • Производитель может произвести следующий предмет до того, как будет использован текущий. Текущий предмет поэтому потерян, но для меня это нормально.
  • Потребитель может потреблять текущий предмет до того, как будет произведен следующий. Поэтому текущий предмет потребляется дважды (или больше), но для меня это нормально.

Итак, я написал это:

QUEUE_ITEM = None

# this is executed in one threading.Thread object
def producer():
    global QUEUE_ITEM
    while True:
        i = produce_item()
        QUEUE_ITEM = i

# this is executed in another threading.Thread object
def consumer():
    global QUEUE_ITEM
    while True:
        i = QUEUE_ITEM
        consume_item(i)

Мой вопрос: является ли этот код потокобезопасным?

Немедленный комментарий: этот код на самом деле не без блокировки - я использую CPython и у него есть GIL.

Я немного протестировал код, и, похоже, он работает. Это переводит к некоторым операциям LOAD и STORE, которые являются атомарными из-за GIL. Но я также знаю, что операция del x не является атомарной, когда x реализует метод __del__. Поэтому, если у моего предмета есть метод __del__ и произойдет какое-то неприятное планирование, вещи могут сломаться. Или нет?

Другой вопрос: какие ограничения (например, для типа производимых элементов) я должен наложить, чтобы вышеописанный код работал нормально?

Мои вопросы касаются только теоретической возможности использовать причуды CPython и GIL, чтобы найти решение без блокировки (то есть без блокировок, таких как threading.Lock явно в коде).

Ответы [ 6 ]

6 голосов
/ 13 мая 2009

Обманит тебя. Просто используйте очередь для связи между потоками.

2 голосов
/ 13 мая 2009

Да, это будет работать так, как вы описали:

  1. Чтобы производитель мог изготовить пропускаемый элемент.
  2. что потребитель может потреблять один и тот же элемент.

Но я также знаю, что операция del x не является атомарной, когда x реализует метод del . Так что, если у моего предмета есть метод del и произойдет какое-то неприятное планирование, вещи могут сломаться.

Я не вижу здесь "del". Если в потребляющем элементе происходит del, тогда в потоке производителя может возникнуть del . Я не думаю, что это будет "проблемой".

Не беспокойтесь об этом. В конечном итоге вы будете использовать ЦП на бессмысленных циклах опроса, и это будет не быстрее, чем использование очереди с блокировками, поскольку Python уже имеет глобальную блокировку.

1 голос
/ 13 мая 2009

Это не действительно потокобезопасный, потому что производитель может перезаписать QUEUE_ITEM до того, как потребитель его потребит, а потребитель может потреблять QUEUE_ITEM дважды. Как вы упомянули, с этим у вас все в порядке, но большинство людей - нет.

Кто-то, у кого больше знаний о внутренностях cpython, должен будет ответить на более теоретические вопросы.

0 голосов
/ 13 мая 2009

__del__ может быть проблемой, как Вы сказали. Этого можно было бы избежать, если бы существовал способ предотвратить сборщик мусора от вызова метода __del__ для старого объекта до того, как мы закончим назначать новый для QUEUE_ITEM. Нам нужно что-то вроде:

increase the reference counter on the old object
assign a new one to `QUEUE_ITEM`
decrease the reference counter on the old object

Боюсь, я не знаю, возможно ли это, хотя.

0 голосов
/ 13 мая 2009

Вы можете использовать список в качестве очереди до тех пор, пока вы добавляете / добавляете, поскольку оба являются атомарными.

QUEUE = []

# this is executed in one threading.Thread object
def producer():
    global QUEUE
    while True:
        i = produce_item()
        QUEUE.append(i)

# this is executed in another threading.Thread object
def consumer():
    global QUEUE
    while True:
        try:
            i = QUEUE.pop(0)
        except IndexError:
            # queue is empty
            continue

        consume_item(i)

В области видимости, как показано ниже, вы даже можете очистить очередь.

class Atomic(object):
    def __init__(self):
        self.queue = []

    # this is executed in one threading.Thread object
    def producer(self):
        while True:
            i = produce_item()
            self.queue.append(i)

    # this is executed in another threading.Thread object
    def consumer(self):
        while True:
            try:
                i = self.queue.pop(0)
            except IndexError:
                # queue is empty
                continue

            consume_item(i)

    # There's the possibility producer is still working on it's current item.
    def clear_queue(self):
        self.queue = []

Вам нужно выяснить, какие операции со списком являются атомарными, взглянув на сгенерированный байт-код.

0 голосов
/ 13 мая 2009

Я думаю, что возможно, что поток прерывается во время производства / потребления, особенно если предметы - большие объекты. Редактировать: это просто дикая догадка. Я не эксперт.

Кроме того, потоки могут производить / потреблять любое количество элементов до запуска другого.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...