Расширение Python Queue.PriorityQueue (рабочий приоритет, типы рабочих пакетов) - PullRequest
1 голос
/ 03 октября 2010

Я хотел бы расширить Queue.PriorityQueue, описанное здесь: http://docs.python.org/library/queue.html#Queue.PriorityQueue

В очереди будут храниться рабочие пакеты с приоритетом.Рабочие получат рабочие пакеты и обработают их.Я хочу сделать следующие дополнения:

  1. Работники тоже имеют приоритет.Когда несколько рабочих простаивают, входящий рабочий пакет должен обрабатывать тот, кто имеет наивысший приоритет.

  2. Не каждый работник может обрабатывать каждый рабочий пакет, поэтому необходим механизм, который проверяет, является ли тип рабочего пакетаи рабочие возможности совпадают.

Я ищу подсказки, как это лучше всего реализовать (начиная с нуля, расширяя PrioriyQueue или Queue, ...).

edit

Вот моя первая (не проверенная) попытка.Основная идея заключается в том, что все ожидающие потоки будут уведомлены.Затем все они пытаются получить рабочий элемент через _choose_worker(self, worker).(Сделано в сообществе вики)

edit

Работает для некоторых простых тестов сейчас ...

edit Добавлена ​​пользовательская BaseManager и локальная копия списка работников в функции _choose_worker.

edit ошибкаисправить

import Queue
from Queue import Empty, Full
from time import time as _time
import heapq

class AdvancedQueue(Queue.PriorityQueue):

    # Initialize the queue representation
    def _init(self, _maxsize):
        self.queue = []
        self.worker = []

    def put(self, item, block=True, timeout=None):
        '''
        Put an item into the queue.

        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until a free slot is available. If 'timeout' is
        a positive number, it blocks at most 'timeout' seconds and raises
        the Full exception if no free slot was available within that time.
        Otherwise ('block' is false), put an item on the queue if a free slot
        is immediately available, else raise the Full exception ('timeout'
        is ignored in that case).
        '''
        self.not_full.acquire()
        try:
            if self.maxsize > 0:
                if not block:
                    if self._qsize() == self.maxsize:
                        raise Full
                elif timeout is None:
                    while self._qsize() == self.maxsize:
                        self.not_full.wait()
                elif timeout < 0:
                    raise ValueError("'timeout' must be a positive number")
                else:
                    endtime = _time() + timeout
                    while self._qsize() == self.maxsize:
                        remaining = endtime - _time()
                        if remaining <= 0.0:
                            raise Full
                        self.not_full.wait(remaining)
            self._put(item)
            self.unfinished_tasks += 1
            self.not_empty.notifyAll()  # only change
        finally:
            self.not_full.release()

    def get(self, worker, block=True, timeout=None):
        self.not_empty.acquire()
        try:
            self._put_worker(worker)

            if not block:
                if not self._qsize():
                    raise Empty
                else:
                    return self._choose_worker(worker)
            elif timeout is None:
                while True:
                    while not self._qsize():
                        self.not_empty.wait()
                    try:
                        return self._choose_worker(worker)
                    except Empty:
                        self.not_empty.wait()

            elif timeout < 0:
                raise ValueError("'timeout' must be a positive number")
            else:
                endtime = _time() + timeout
                def wait(endtime):
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)

                while True:
                    while not self._qsize():
                        wait(endtime)

                    try:
                        return self._choose_worker(worker)
                    except Empty:
                        wait(endtime)
        finally:
            self._remove_worker(worker)
            self.not_empty.release()

    # Put a new worker in the worker queue
    def _put_worker(self, worker, heappush=heapq.heappush):
        heappush(self.worker, worker)

    # Remove a worker from the worker queue
    def _remove_worker(self, worker):
        self.worker.remove(worker)

    # Choose a matching worker with highest priority
    def _choose_worker(self, worker):
        worker_copy = self.worker[:]    # we need a copy so we can remove assigned worker
        for item in self.queue:
            for enqueued_worker in worker_copy:
                if item[1].type in enqueued_worker[1].capabilities:
                    if enqueued_worker == worker:
                        self.queue.remove(item)
                        self.not_full.notify()
                        return item
                    else:
                        worker_copy.remove(enqueued_worker)
                        # item will be taken by enqueued_worker (which has higher priority),
                        # so enqueued_worker is busy and can be removed
                        continue
        raise Empty

Ответы [ 2 ]

1 голос
/ 04 октября 2010

Я думаю, что вы описываете ситуацию, когда у вас есть две «приоритетные очереди» - одна для рабочих мест и одна для рабочих. Наивный подход заключается в том, чтобы взять работу с наивысшим приоритетом и работника с наивысшим приоритетом и попытаться соединить их. Но, конечно, это не помогает, когда работник не может выполнить задание.

Чтобы исправить это, я бы предложил сначала взять на себя работу с наивысшим приоритетом, а затем итерировать по всем рабочим в порядке убывания приоритета, пока не найдете тот, который сможет обработать эту работу. Если никто из работников не может обработать задание, тогда возьмите вторую работу с наивысшим приоритетом и так далее. Таким образом, у вас есть вложенные циклы, что-то вроде этого:

def getNextWorkerAndJobPair():
    for job in sorted(jobs, key=priority, reverse=True):
        for worker in sorted(workers, key=priority, reverse=True):
             if worker.can_process(job):
                 return (worker, job)

Приведенный выше пример сортирует данные без необходимости много раз. Чтобы избежать этого, лучше всего хранить данные уже в отсортированном порядке. Что касается того, какие структуры данных использовать, я не совсем уверен, что лучше. В идеале вам нужно, чтобы O (log n) вставляло и удаляло, и чтобы можно было перебирать коллекцию в отсортированном порядке за O (n). Я думаю, что PriorityQueue соответствует первому из этих требований, но не второму. Я полагаю, что сортированный список из пакета blist сработает, но я сам не пробовал, и на веб-странице не говорится о гарантиях производительности, которые предлагает этот класс.

Способ, которым я предлагал сначала выполнить итерации по заданиям, а затем по рабочим во внутреннем цикле, - это не единственный подход, который вы могли бы использовать. Вы также можете изменить порядок циклов, чтобы сначала выбрать работника с наивысшим приоритетом, а затем попытаться найти для него работу. Или вы можете найти допустимую пару (работа, работник), которая имеет максимальное значение f (priority_job, priority_worker) для некоторой функции f (например, просто добавить приоритеты).

0 голосов
/ 12 октября 2010

Единственный ответ был полезным, но недостаточно подробным, поэтому я пока приму свой собственный ответ.Смотрите код в вопросе.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...