Python ProcessPoolExecutor с ограниченной очередью - PullRequest
0 голосов
/ 20 февраля 2020

Есть ли способ, чтобы ProcessPoolExecutor имел ограниченную очередь для обработки входящих запросов, когда все работники заняты?

В документации не объясняется, что происходит, если вызывается метод submit () и все работники занятый. Тем не менее, я провел небольшое исследование, и оказалось, что ProcessPoolExecutor имеет свою собственную внутреннюю очередь, которая, по-видимому, не ограничена. Как правило, использование несвязанных очередей не является хорошей практикой, поскольку Исполнитель может использоваться для взлома sh системы (атака DoS). Примерно так может легко взломать sh систему, если «some_function» занимает слишком много времени для запуска и получает аргументы, которые имеют большие размеры.

with ProcessPoolExecutor(max_workers=5) as executor:
    for arg in range(10000000000000):
        future = executor.submit(some_function, args)

Мне было интересно, есть ли способ ограничить размер внутренней очереди или использовать внешнюю?

Ответы [ 2 ]

1 голос
/ 26 февраля 2020

Более чистый подход, позволяющий избежать вмешательства во внутренние органы ProcessPoolExecutor, заключался бы в использовании BoundedSemaphore , который увеличивается при каждом выполнении задачи и уменьшается при каждом выполнении задачи.

Преимущество этого заключается в блокировке отправки вместо выдачи ошибки.

Рабочий пример можно найти в этом gist .

0 голосов
/ 24 февраля 2020

Как уже упоминалось в моем вопросе, ProcessPoolExecutor имеет свою собственную внутреннюю очередь, которая не ограничена. Однако ProcessPoolExecutor._queue_count подсчитывает количество активных запросов (запущено + ожидающих).

Для меня ограничено, просто создайте оболочку поверх ProcessPoolExecutor, чтобы проверить счетчик и выдать некоторое исключение времени выполнения, если число превышает желаемый максимальный размер очереди:

    self._max_queue_size = self._max_workers + max_queue_size 

а затем:

def submit(self, fn, *args, **kwargs) -> Future:
    if self._executor._queue_count >= self._max_queue_size:
        raise RuntimeError(
            f"{self.__class__.__name__} has reached its maximum of "
            f"{self._max_queue_size} active (running + queued) requests.")
    return self._executor.submit(fn, *args, **kwargs)

Возможно, это не самое лучшее и чистое решение, но оно, безусловно, сработало для меня.

...