Мультиплекс на очереди. Очередь? - PullRequest
12 голосов
/ 10 декабря 2011

Как я могу "выбрать" несколько queue.Queue одновременно?

Golang имеет желаемую функцию со своими каналами:

select {
case i1 = <-c1:
    print("received ", i1, " from c1\n")
case c2 <- i2:
    print("sent ", i2, " to c2\n")
case i3, ok := (<-c3):  // same as: i3, ok := <-c3
    if ok {
        print("received ", i3, " from c3\n")
    } else {
        print("c3 is closed\n")
    }
default:
    print("no communication\n")
}

При этом первый канал, который разблокирует, выполняет соответствующий блок. Как бы я достиг этого в Python?

Update0

За ссылку , указанную в ответе tux21b , требуемый тип очереди имеет следующие свойства:

  • Многопользовательские / многопользовательские очереди (MPMC)
  • обеспечивает производителя FIFO / LIFO
  • Когда очередь пуста / полные потребители / производители блокируются

Кроме того, каналы могут блокироваться, производители будут блокировать, пока потребитель не получит элемент. Я не уверен, что Очередь Питона может сделать это.

Ответы [ 4 ]

3 голосов
/ 10 декабря 2011

Если вы используете queue.PriorityQueue, вы можете получить аналогичное поведение, используя объекты каналов в качестве приоритетов:

import threading, logging
import random, string, time
from queue import PriorityQueue, Empty
from contextlib import contextmanager

logging.basicConfig(level=logging.NOTSET,
                    format="%(threadName)s - %(message)s")

class ChannelManager(object):
    next_priority = 0

    def __init__(self):
        self.queue = PriorityQueue()
        self.channels = []

    def put(self, channel, item, *args, **kwargs):
        self.queue.put((channel, item), *args, **kwargs)

    def get(self, *args, **kwargs):
        return self.queue.get(*args, **kwargs)

    @contextmanager
    def select(self, ordering=None, default=False):
        if default:
            try:
                channel, item = self.get(block=False)
            except Empty:
                channel = 'default'
                item = None
        else:
            channel, item = self.get()
        yield channel, item


    def new_channel(self, name):
        channel = Channel(name, self.next_priority, self)
        self.channels.append(channel)
        self.next_priority += 1
        return channel


class Channel(object):
    def __init__(self, name, priority, manager):
        self.name = name
        self.priority = priority
        self.manager = manager

    def __str__(self):
        return self.name

    def __lt__(self, other):
        return self.priority < other.priority

    def put(self, item):
        self.manager.put(self, item)


if __name__ == '__main__':
    num_channels = 3
    num_producers = 4
    num_items_per_producer = 2
    num_consumers = 3
    num_items_per_consumer = 3

    manager = ChannelManager()
    channels = [manager.new_channel('Channel#{0}'.format(i))
                for i in range(num_channels)]

    def producer_target():
        for i in range(num_items_per_producer):
            time.sleep(random.random())
            channel = random.choice(channels)
            message = random.choice(string.ascii_letters)
            logging.info('Putting {0} in {1}'.format(message, channel))
            channel.put(message)

    producers = [threading.Thread(target=producer_target,
                                  name='Producer#{0}'.format(i))
                 for i in range(num_producers)]
    for producer in producers:
        producer.start()
    for producer in producers:
        producer.join()
    logging.info('Producers finished')

    def consumer_target():
        for i in range(num_items_per_consumer):
            time.sleep(random.random())
            with manager.select(default=True) as (channel, item):
                if channel:
                    logging.info('Received {0} from {1}'.format(item, channel))
                else:
                    logging.info('No data received')

    consumers = [threading.Thread(target=consumer_target,
                                  name='Consumer#{0}'.format(i))
                 for i in range(num_consumers)]
    for consumer in consumers:
        consumer.start()
    for consumer in consumers:
        consumer.join()
    logging.info('Consumers finished')

Пример вывода:

Producer#0 - Putting x in Channel#2
Producer#2 - Putting l in Channel#0
Producer#2 - Putting A in Channel#2
Producer#3 - Putting c in Channel#0
Producer#3 - Putting z in Channel#1
Producer#1 - Putting I in Channel#1
Producer#1 - Putting L in Channel#1
Producer#0 - Putting g in Channel#1
MainThread - Producers finished
Consumer#1 - Received c from Channel#0
Consumer#2 - Received l from Channel#0
Consumer#0 - Received I from Channel#1
Consumer#0 - Received L from Channel#1
Consumer#2 - Received g from Channel#1
Consumer#1 - Received z from Channel#1
Consumer#0 - Received A from Channel#2
Consumer#1 - Received x from Channel#2
Consumer#2 - Received None from default
MainThread - Consumers finished

В этом примере ChannelManagerэто просто оболочка вокруг queue.PriorityQueue, которая реализует метод select как contextmanager, чтобы он выглядел аналогично оператору select в Go.

Несколько замечаний:

  • Порядок

    • В примере Go порядок, в котором каналы записываются внутри оператора select, определяет, какой код канала будет выполняться при наличии данных.доступно для более чем одного канала.

    • В примере с питоном порядок определяется приоритетом, назначенным каждому каналу.Однако приоритет может быть динамически назначен каждому каналу (как видно в примере), поэтому изменение порядка будет возможно с более сложным методом select, который заботится о назначении новых приоритетов на основе аргумента метода.Кроме того, старый порядок можно восстановить после завершения работы менеджера контекста.

  • Блокировка

    • В примере Goоператор select блокируется, если существует случай default.

    • В примере с python логический аргумент должен быть передан методу select, чтобы сделать его понятнымкогда блокировка / неблокирование желательно.В неблокирующем случае канал, возвращаемый менеджером контекста, является просто строкой 'default', поэтому в коде внутри легко обнаружить это в коде внутри оператора with.

  • Потоки: Объект в модуле queue уже готов для сценариев с несколькими производителями, как в примере с несколькими производителями.

2 голосов
/ 09 июня 2013

Проект pychan дублирует каналы Go в Python, включая мультиплексирование. Он реализует тот же алгоритм, что и Go, поэтому он отвечает всем вашим желаемым свойствам:

  • Несколько производителей и потребителей могут общаться через Чана. Когда и производитель, и потребитель готовы, пара из них заблокирует
  • Производители и потребители обслуживаются в том порядке, в котором они прибыли (FIFO)
  • Пустая (полная) очередь заблокирует потребителей (производителей).

Вот как будет выглядеть ваш пример:

c1 = Chan(); c2 = Chan(); c3 = Chan()

try:
    chan, value = chanselect([c1, c3], [(c2, i2)])
    if chan == c1:
        print("Received %r from c1" % value)
    elif chan == c2:
        print("Sent %r to c2" % i2)
    else:  # c3
        print("Received %r from c3" % value)
except ChanClosed as ex:
    if ex.which == c3:
        print("c3 is closed")
    else:
        raise

(Полное раскрытие: я написал эту библиотеку)

2 голосов
/ 17 декабря 2011

Существует много различных реализаций очередей производитель-потребитель, таких как queue.Queue .Они обычно отличаются по многим свойствам, как указано в этой превосходной статье Дмитрия Вьюкова.Как видите, возможны более 10 тысяч различных комбинаций.Алгоритмы, используемые для таких очередей, также сильно различаются в зависимости от требований.Невозможно просто расширить существующий алгоритм очереди, чтобы гарантировать дополнительные свойства, поскольку для этого обычно требуются разные внутренние структуры данных и разные алгоритмы.

Каналы Go предлагают относительно большое количество гарантированных свойств, поэтому эти каналы могут быть подходящимидля многих программ.Одним из самых сложных требований является поддержка одновременного чтения / блокировки на нескольких каналах (оператор select) и справедливый выбор канала, если более чем одна ветвь в операторе select может быть выполнена, так что не останется ни одного сообщения.,Python queue.Queue не предлагает эти функции, поэтому просто невозможно заархивировать такое же поведение с ним.

Итак, если вы хотите продолжить использовать queue.Queue , вам нужно найти обходные пути для этой проблемы.Обходные пути, однако, имеют свой собственный список недостатков и их сложнее поддерживать.Поиск другой очереди производитель-потребитель, которая предлагает функции, которые вам нужны, может быть лучшей идеей!В любом случае, есть два возможных обходных пути:

Опрос

while True:
  try:
    i1 = c1.get_nowait()
    print "received %s from c1" % i1
  except queue.Empty:
    pass
  try:
    i2 = c2.get_nowait()
    print "received %s from c2" % i2
  except queue.Empty:
    pass
  time.sleep(0.1)

Это может использовать много циклов ЦП при опросе каналов и может быть медленным при наличиимного сообщений.Использование time.sleep () с экспоненциальным временем задержки (вместо показанных здесь постоянных 0,1 с) может значительно улучшить эту версию.

Одиночная очередь уведомлений

queue_id = notify.get()
if queue_id == 1:
  i1 = c1.get()
  print "received %s from c1" % i1
elif queue_id == 2:
  i2 = c2.get()
  print "received %s from c2" % i2

При такой настройке вы должны отправить что-то в очередь уведомлений после отправки на c1 или c2.Это может сработать для вас, если вам достаточно только одной такой очереди уведомлений (т. Е. У вас нет нескольких «выборок», каждый из которых блокирует свое подмножество ваших каналов).

В качестве альтернативы вы можетеТакже рассмотрите возможность использования Go.Goroutines и поддержка параллелизма Go намного мощнее, чем ограниченные возможности Python для потоков.

1 голос
/ 17 декабря 2011
from queue import Queue

# these imports needed for example code
from threading import Thread
from time import sleep
from random import randint

class MultiQueue(Queue):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.queues = []

    def addQueue(self, queue):
        queue.put = self._put_notify(queue, queue.put)
        queue.put_nowait = self._put_notify(queue, queue.put_nowait)
        self.queues.append(queue)

    def _put_notify(self, queue, old_put):
        def wrapper(*args, **kwargs):
            result = old_put(*args, **kwargs)
            self.put(queue)
            return result
        return wrapper

if __name__ == '__main__':
    # an example of MultiQueue usage

    q1 = Queue()
    q1.name = 'q1'
    q2 = Queue()
    q2.name = 'q2'
    q3 = Queue()
    q3.name = 'q3'

    mq = MultiQueue()
    mq.addQueue(q1)
    mq.addQueue(q2)
    mq.addQueue(q3)

    queues = [q1, q2, q3]
    for i in range(9):
        def message(i=i):
            print("thread-%d starting..." % i)
            sleep(randint(1, 9))
            q = queues[i%3]
            q.put('thread-%d ending...' % i)
        Thread(target=message).start()

    print('awaiting results...')
    for _ in range(9):
        result = mq.get()
        print(result.name)
        print(result.get())

Вместо того, чтобы пытаться использовать метод .get() для нескольких очередей, идея состоит в том, чтобы очереди уведомляли MultiQueue, когда они готовы к данным - сортировка select в обратном порядке.Это достигается за счет использования MultiQueue различных методов Queue put() и put_nowait(), чтобы при добавлении чего-либо в эти очереди эта очередь затем put() помещалась в MultiQueue исоответствующий MultiQueue.get() извлечет Queue, у которого есть готовые данные.

Этот MultiQueue основан на очереди FIFO, но вы также можете использовать очереди LIFO или Priority в качестве базы в зависимости от ваших потребностей.

...