Параллелизм в очереди redis - PullRequest
0 голосов
/ 20 июня 2019

Я работаю с приложением django, размещенным на heroku, с редистого addon: nano pack.Я использую rq для выполнения задач в фоновом режиме - задачи инициируются онлайн-пользователями.Я ограничен в увеличении количества соединений, я боюсь ограниченных ресурсов.

В настоящее время у меня есть один рабочий, который работает с числом n очередей.Каждая очередь использует экземпляр соединения из пула соединений для обработки «n» различных типов задач.Например, скажем, если 4 пользователя инициируют задачу одного типа, я бы хотел, чтобы мой основной сотрудник динамически создавал дочерние процессы для их обработки.Есть ли способ добиться требуемой многопроцессорности и параллелизма?

Я пробовал с модулем multiprocessing, изначально без введения Lock();но это выставляет и перезаписывает данные, переданные пользователем в функцию инициализации, с данными предыдущего запроса.После применения блокировок он запрещает второму пользователю инициировать запросы, возвращая server error - 500

ссылку на github # 1 : похоже, команда работает над PR;еще не выпущен!

github link # 2 : Этот пост помогает объяснить создание большего количества рабочих во время выполнения.Это решение, однако, также переопределяет данные.Новый запрос снова обрабатывается с данными предыдущих запросов.

Дайте мне знать, если вам нужен какой-то код.Я постараюсь опубликовать минимальный воспроизводимый фрагмент.

Есть какие-нибудь мысли / предложения / рекомендации?

Ответы [ 2 ]

0 голосов
/ 29 июня 2019

У вас был шанс попробовать AutoWorker ?

Spawn RQ Workers автоматически.

from autoworker import AutoWorker
aw = AutoWorker(queue='high', max_procs=6)
aw.work()

Используется multiprocessing с StrictRedis из redis модуля и последующий импорт из rq

from rq.contrib.legacy import cleanup_ghosts
from rq.queue import Queue
from rq.worker import Worker, WorkerStatus
0 голосов
/ 22 июня 2019

Заглянув под капот, я понял, что Worker класс уже реализует многопроцессорность.

Функция work внутренне вызывает execute_job(job, queue), что в свою очередь, как указано в модуле

Создает рабочую лошадь для выполнения фактической работы и передает ей работу.

Рабочий будет ждать рабочей лошади и следить за тем, чтобы она выполнялась в заданных пределах времени ожидания,

или закончит рабочую лошадь с помощью SIGALRM.

Функция execute_job() неявно вызывает fork_work_horse(job, queue), который порождает рабочую лошадь для выполнения фактической работы и передает ей задание согласно следующей логике:


def fork_work_horse(self, job, queue):

        child_pid = os.fork()
        os.environ['RQ_WORKER_ID'] = self.name
        os.environ['RQ_JOB_ID'] = job.id
        if child_pid == 0:
            self.main_work_horse(job, queue)
        else:
            self._horse_pid = child_pid
            self.procline('Forked {0} at {1}'.format(child_pid, time.time()))


main_work_horse выполняет внутренний вызов perform_job(job, queue), который выполняет несколько других вызовов для фактического выполнения задания.

Все шаги по жизненному циклу работника, упомянутые на официальной странице документации rq , выполняются в рамках этих вызовов.

Это не многопроцессорная обработка, которую я ожидал, но я думаю, у них есть способ сделать что-то. Однако на мой оригинальный пост до сих пор нет ответа, также я все еще не уверен в параллельности ..

Над документацией еще нужно поработать, поскольку она едва ли охватывает истинную сущность этой библиотеки!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...