Как реализовать приоритетную многопроцессорную очередь в Python? - PullRequest
5 голосов
/ 30 августа 2009

Кто-нибудь знает, как я могу реализовать приоритетную многопроцессорную очередь в python?

Ответы [ 6 ]

5 голосов
/ 30 августа 2009

Увы, это нигде не так просто, как изменить дисциплину обслуживания старого доброго Queue.Queue: последний на самом деле предназначен для создания подклассов в соответствии с шаблоном метода-шаблона и переопределяет только методы хука _put и / или _get может легко позволить изменить дисциплину организации очередей (в 2.6 предлагается явный LIFO и приоритетные реализации, но их было легко сделать даже в более ранних версиях Python).

Что касается многопроцессорной обработки, то в общем случае (несколько читателей, несколько писателей) я не вижу решения для реализации приоритетных очередей, кроме как отказаться от распределенной природы очереди; назначить один специальный вспомогательный процесс, который ничего не делает, кроме обработки очередей, посылать (по существу) RPC в него, чтобы создать очередь с указанной дисциплиной, do помещает и получает к ней, получает информацию об этом и т. д. Таким образом, можно было бы получить обычные проблемы с гарантией того, что каждый процесс знает о расположении вспомогательного процесса (скажем, хост и порт) и т. Д. (Проще, если процесс всегда вызывается при запуске основным процессором). Довольно большая проблема, особенно если кто-то хочет сделать это с хорошей производительностью, защитит от сбоев aux proc (требующих репликации данных на подчиненные процессы, распределенных «главных выборов» среди подчиненных, если мастер сбоит и т. Д.) И так далее. Делать это с нуля звучит как работа доктора философии. Можно было бы начать с работы Джонсона или использовать какой-то очень общий подход, например ActiveMQ .

Некоторые особые случаи (например, один читатель, один писатель) могут быть проще и оказываются быстрее для их ограниченной области применения; но тогда для этой ограниченной области должна быть разработана очень специфически ограниченная спецификация, и результаты не будут представлять собой (универсальную) «многопроцессорную очередь», а применимы только к данному ограниченному набору требований.

2 голосов
/ 30 августа 2009

Существует ошибка, препятствующая истинному FIFO.
Читайте здесь .

Альтернативный способ создания многопроцессорной установки с приоритетной очередью, безусловно, был бы великолепен!

1 голос
/ 17 ноября 2009

У меня был такой же вариант использования. Но с конечным числом приоритетов.

То, что я в итоге делаю, - это создание одной очереди для каждого приоритета, и мои работники процесса будут пытаться получить элементы из этих очередей, начиная с самой важной очереди до менее важной (переход от одной очереди к другой сделано, когда очередь пуста)

1 голос
/ 30 августа 2009

Хотя это не ответ, возможно, он может помочь вам разработать многопроцессорную очередь.

Вот простой приоритетный класс класс, который я написал с использованием Python Array :

class PriorityQueue():
    """A basic priority queue that dequeues items with the smallest priority number."""
    def __init__(self):
        """Initializes the queue with no items in it."""
        self.array = []
        self.count = 0

    def enqueue(self, item, priority):
        """Adds an item to the queue."""
        self.array.append([item, priority])
        self.count += 1

    def dequeue(self):
        """Removes the highest priority item (smallest priority number) from the queue."""
        max = -1
        dq = 0
        if(self.count > 0):
            self.count -= 1

            for i in range(len(self.array)):
                if self.array[i][1] != None and self.array[i][1] > max:
                    max = self.array[i][1]

            if max == -1:
                return self.array.pop(0)
            else:
                for i in range(len(self.array)):
                    if self.array[i][1] != None and self.array[i][1] <= max:
                        max = self.array[i][1]
                        dq = i
                return self.array.pop(dq)

    def requeue(self, item, newPrio):
        """Changes specified item's priority."""
        for i in range(len(self.array)):
            if self.array[i][0] == item:
                self.array[i][1] = newPrio
                break

    def returnArray(self):
        """Returns array representation of the queue."""
        return self.array

    def __len__(self):
        """Returnes the length of the queue."""
        return self.count
0 голосов
/ 18 марта 2014

Вдохновленный предложением @ user211505, я собрал что-то быстрое и грязное.

Обратите внимание, что это не полное решение сложной проблемы очередей с приоритетом в многопроцессорных производственных средах. Однако, если вы просто бездельничаете или нуждаетесь в чем-то для короткого проекта, это, вероятно, будет отвечать всем требованиям:

from time import sleep
from datetime import datetime
from Queue import Empty
from multiprocessing import Queue as ProcessQueue

class SimplePriorityQueue(object):
    '''
    Simple priority queue that works with multiprocessing. Only a finite number 
    of priorities are allowed. Adding many priorities slow things down. 

    Also: no guarantee that this will pull the highest priority item 
    out of the queue if many items are being added and removed. Race conditions
    exist where you may not get the highest priority queue item.  However, if 
    you tend to keep your queues not empty, this will be relatively rare.
    '''
    def __init__(self, num_priorities=1, default_sleep=.2):
        self.queues = []
        self.default_sleep = default_sleep
        for i in range(0, num_priorities):
            self.queues.append(ProcessQueue())

    def __repr__(self):
        return "<Queue with %d priorities, sizes: %s>"%(len(self.queues), 
                    ", ".join(map(lambda (i, q): "%d:%d"%(i, q.qsize()), 
                                enumerate(self.queues))))

    qsize = lambda(self): sum(map(lambda q: q.qsize(), self.queues))

    def get(self, block=True, timeout=None):
        start = datetime.utcnow()
        while True:
            for q in self.queues:
                try:
                    return q.get(block=False)
                except Empty:
                    pass
            if not block:
                raise Empty
            if timeout and (datetime.utcnow()-start).total_seconds > timeout:
                raise Empty

            if timeout:
                time_left = (datetime.utcnow()-start).total_seconds - timeout
                sleep(time_left/4)
            else:
                sleep(self.default_sleep)

    get_nowait = lambda(self): self.get(block=False)

    def put(self, priority, obj, block=False, timeout=None):
        if priority < 0 or priority >= len(self.queues):
            raise Exception("Priority %d out of range."%priority)
        # Block and timeout don't mean much here because we never set maxsize
        return self.queues[priority].put(obj, block=block, timeout=timeout)
0 голосов
/ 31 августа 2009

В зависимости от ваших требований вы можете использовать операционную систему и файловую систему несколькими способами. Насколько большой будет расти очередь и как быстро она должна быть? Если очередь может быть большой, но вы готовы открывать пару файлов для каждого доступа к очереди, вы можете использовать реализацию BTree для хранения очереди и блокировки файлов для обеспечения монопольного доступа. Медленно, но крепко.

Если очередь останется относительно маленькой, и вам нужно, чтобы она была быстрой, возможно, вы сможете использовать общую память в некоторых операционных системах ...

Если очередь будет маленькой (1000 записей) и вам не нужно чтобы быть очень быстрым, вы могли бы использовать что-то так же просто, как каталог с файлами, содержащими данные с блокировкой файлов. Это было бы моим предпочтением, если маленький и медленный в порядке.

Если очередь может быть большой и вы хотите, чтобы она была в среднем быстрой, то вам, вероятно, следует использовать выделенный серверный процесс, как предлагает Алекс. Однако это боль в шее.

Каковы ваши требования к производительности и размеру?

...