Как создать отношение `` disabled_on`` между запланированными и поставленными в очередь заданиями в python-rq - PullRequest
4 голосов
/ 04 ноября 2019

У меня есть веб-сервис (Python 3.7, Flask 1.0.2) с рабочим процессом, состоящим из 3 шагов:

  • Шаг 1. Отправка задания удаленного вычисления в коммерческую систему очередей (LSF IBM))
  • Шаг 2: Опрос каждые 61 секунда для состояния задания удаленного вычисления (61 секунда из-за результатов в состоянии кэшированного задания)
  • Шаг 3: Постобработка данных, если шаг 2 возвращает задание удаленного вычисленияstatus == "DONE"

Задание удаленного вычисления имеет произвольную продолжительность (между секундами и днями), и каждый шаг зависит от завершения предыдущего:

with Connection(redis.from_url(current_app.config['REDIS_URL'])):
    q = Queue()
    job1 = q.enqueue(step1)
    job2 = q.enqueue(step2, depends_on=job1)
    job3 = q.enqueue(step3, depends_on=job2)

Однако в конечном итоге все работники (4 работника) будут выполнять опрос (шаг 2 из 4 клиентских запросов), при этом они должны продолжать выполнять шаг 1 других входящих запросов и шаг 3 тех рабочих процессов, которые успешно прошли шаг 2.

Рабочие должны быть освобождены после каждого опроса. Они должны периодически возвращаться к шагу 2 для следующего опроса (не чаще, чем каждые 61 секунда на задание), и если опрос удаленных вычислений не возвращает «ВЫПОЛНЕНО», повторно поставьте задачу опроса в очередь.


В этот момент я начал использовать rq-scheduler (потому что функции интервалов и очередей казались многообещающими):

with Connection(redis.from_url(current_app.config['REDIS_URL'])):
    q = Queue()
    s = Scheduler('default')

    job1 = q.enqueue(step1, REQ_ID)

    job2 = Job.create(step2, (REQ_ID,), depends_on=job1)
    job2.meta['interval'] = 61
    job2.origin = 'default'
    job2.save()
    s.enqueue_job(job2)

    job3 = q.enqueue(step3, REQ_ID, depends_on=job2)

Job2 создан правильно (включая отношение depends_on к job1, но s. enqueue_job () выполняет его сразу же, игнорируя его отношение к job1. (Функция doc-string функции q.enqueue_job () фактически говорит, что она выполняется немедленно ...).

Как я могусоздать отношение depends_on между job1, job2 и job3, когда job2 помещается в планировщик, а не в очередь? (Или как я могу передать job2 планировщику, не выполняя сразу job2 и ожидая завершения job1? )


В целях тестирования шаги выглядят следующим образом:

def step1():
    print(f'*** --> [{datetime.utcnow()}] JOB [ 1 ] STARTED...', flush=True)
    time.sleep(20)
    print(f'    <-- [{datetime.utcnow()}] JOB [ 1 ] FINISHED', flush=True)
    return True

def step2():
    print(f'    --> [{datetime.utcnow()}] POLL JOB [ 2 ] STARTED...', flush=True)
    time.sleep(10)
    print(f'    <-- [{datetime.utcnow()}] POLL JOB [ 2 ] FINISHED', flush=True)
    return True

def step3():
    print(f'    --> [{datetime.utcnow()}] JOB [ 3 ] STARTED...', flush=True)
    time.sleep(10)
    print(f'*** <-- [{datetime.utcnow()}] JOB [ 3 ] FINISHED', flush=True)
    return True

И вывод, который я получаю, таков:

worker_1     | 14:44:57 default: project.server.main.tasks.step1(1) (d40256a2-904f-4ce3-98da-6e49b5d370c9)
worker_2     | 14:44:57 default: project.server.main.tasks.step2(1) (3736909c-f05d-4160-9a76-01bb1b18db58)
worker_2     |     --> [2019-11-04 14:44:57.341133] POLL JOB [ 2 ] STARTED...
worker_1     | *** --> [2019-11-04 14:44:57.342142] JOB [ 1 ] STARTED...
...

job2 isне дожидаясьr job1 для завершения ...


#requirements.txt
Flask==1.0.2
Flask-Bootstrap==3.3.7.1
Flask-Testing==0.7.1
Flask-WTF==0.14.2
redis==3.3.11
rq==0.13
rq_scheduler==0.9.1

1 Ответ

0 голосов
/ 08 ноября 2019

Мое решение этой проблемы использует только rq (и больше rq_scheduler):

  1. Обновление до последней версии пакета python-rq:

    # requirements.txt
    ...
    rq==1.1.0
    
  2. Создайте выделенную очередь для заданий опроса и соответственно поставьте в очередь задания (с отношением depends_on):

    with Connection(redis.from_url(current_app.config['REDIS_URL'])):
        q = Queue('default')
        p = Queue('pqueue')
        job1 = q.enqueue(step1)
        job2 = p.enqueue(step2, depends_on=job1)  # step2 enqueued in polling queue
        job3 = q.enqueue(step3, depends_on=job2)
    
  3. Извлечение выделенного работника дляочередь на опрос. Он наследует от стандартного Worker класса:

    class PWorker(rq.worker.Worker):
        def execute_job(self, *args, **kwargs):
            seconds_between_polls = 65
            job = args[0]
            if 'lastpoll' in job.meta:
                job_timedelta = (datetime.utcnow() - job.meta["lastpoll"]).total_seconds()
                if job_timedelta < seconds_between_polls:
                    sleep_period = seconds_between_polls - job_timedelta
                    time.sleep(sleep_period)
            job.meta['lastpoll'] = datetime.utcnow()
            job.save_meta()
    
            super().execute_job(*args, **kwargs)
    

    . PWorker расширяет метод execute_job, добавляя метку времени к метаданным задания 'lastpoll'.

    Если приходит задание опроса с отметкой времени lastpoll, работник проверяет, превышает ли период времени с lastpoll 65 секунд. Если это так, он записывает текущее время в 'lastpoll' и выполняет опрос. Если нет, он спит до тех пор, пока не истечет 65, а затем записывает текущее время в 'lastpoll' и выполняет опрос. Задание, поступающее без отметки времени lastpoll, опрашивается впервые, и работник создает отметку времени и выполняет опрос.

  4. Создайте выделенное исключение (которое будет выдано задачейфункция) и обработчик исключений, чтобы справиться с ним:

    # exceptions.py
    
    class PACError(Exception):
        pass
    
    class PACJobRun(PACError):
        pass
    
    class PACJobExit(PACError):
        pass
    
    # exception_handlers.py
    
    def poll_exc_handler(job, exc_type, exc_value, traceback):
        if exc_type is PACJobRun:
            requeue_job(job.get_id(), connection=job.connection)
            return False  # no further exception handling
        else:
            return True  # further exception handling
    
    # tasks.py
    
    def step2():
        # GET request to remote compute job portal API for status
        # if response == "RUN":
        raise PACJobRun
        return True
    

    Когда обработчик пользовательских исключений ловит пользовательское исключение (что означает, что задание удаленного вычисления все еще выполняется), он запрашиваетзадание в очереди опроса.

  5. Вставьте обработчик пользовательских исключений в иерархию обработки исключений:

    # manage.py
    
    @cli.command('run_pworker')
    def run_pworker():
        redis_url = app.config['REDIS_URL']
        redis_connection = redis.from_url(redis_url)
        with rq.connections.Connection(redis_connection):
            pworker = PWorker(app.config['PQUEUE'], exception_handlers=[poll_exc_handler])
            pworker.work()
    

Приятная особенность этого решенияв том, что он расширяет стандартную функциональность python-rq всего несколькими строками дополнительного кода. С другой стороны, есть дополнительная сложность дополнительной очереди и рабочего ...

...