У меня есть веб-сервис (Python 3.7, Flask 1.0.2) с рабочим процессом, состоящим из 3 шагов:
- Шаг 1. Отправка задания удаленного вычисления в коммерческую систему очередей (LSF IBM))
- Шаг 2: Опрос каждые 61 секунда для состояния задания удаленного вычисления (61 секунда из-за результатов в состоянии кэшированного задания)
- Шаг 3: Постобработка данных, если шаг 2 возвращает задание удаленного вычисленияstatus == "DONE"
Задание удаленного вычисления имеет произвольную продолжительность (между секундами и днями), и каждый шаг зависит от завершения предыдущего:
with Connection(redis.from_url(current_app.config['REDIS_URL'])):
q = Queue()
job1 = q.enqueue(step1)
job2 = q.enqueue(step2, depends_on=job1)
job3 = q.enqueue(step3, depends_on=job2)
Однако в конечном итоге все работники (4 работника) будут выполнять опрос (шаг 2 из 4 клиентских запросов), при этом они должны продолжать выполнять шаг 1 других входящих запросов и шаг 3 тех рабочих процессов, которые успешно прошли шаг 2.
Рабочие должны быть освобождены после каждого опроса. Они должны периодически возвращаться к шагу 2 для следующего опроса (не чаще, чем каждые 61 секунда на задание), и если опрос удаленных вычислений не возвращает «ВЫПОЛНЕНО», повторно поставьте задачу опроса в очередь.
В этот момент я начал использовать rq-scheduler
(потому что функции интервалов и очередей казались многообещающими):
with Connection(redis.from_url(current_app.config['REDIS_URL'])):
q = Queue()
s = Scheduler('default')
job1 = q.enqueue(step1, REQ_ID)
job2 = Job.create(step2, (REQ_ID,), depends_on=job1)
job2.meta['interval'] = 61
job2.origin = 'default'
job2.save()
s.enqueue_job(job2)
job3 = q.enqueue(step3, REQ_ID, depends_on=job2)
Job2 создан правильно (включая отношение depends_on
к job1, но s. enqueue_job () выполняет его сразу же, игнорируя его отношение к job1. (Функция doc-string функции q.enqueue_job () фактически говорит, что она выполняется немедленно ...).
Как я могусоздать отношение depends_on
между job1, job2 и job3, когда job2 помещается в планировщик, а не в очередь? (Или как я могу передать job2 планировщику, не выполняя сразу job2 и ожидая завершения job1? )
В целях тестирования шаги выглядят следующим образом:
def step1():
print(f'*** --> [{datetime.utcnow()}] JOB [ 1 ] STARTED...', flush=True)
time.sleep(20)
print(f' <-- [{datetime.utcnow()}] JOB [ 1 ] FINISHED', flush=True)
return True
def step2():
print(f' --> [{datetime.utcnow()}] POLL JOB [ 2 ] STARTED...', flush=True)
time.sleep(10)
print(f' <-- [{datetime.utcnow()}] POLL JOB [ 2 ] FINISHED', flush=True)
return True
def step3():
print(f' --> [{datetime.utcnow()}] JOB [ 3 ] STARTED...', flush=True)
time.sleep(10)
print(f'*** <-- [{datetime.utcnow()}] JOB [ 3 ] FINISHED', flush=True)
return True
И вывод, который я получаю, таков:
worker_1 | 14:44:57 default: project.server.main.tasks.step1(1) (d40256a2-904f-4ce3-98da-6e49b5d370c9)
worker_2 | 14:44:57 default: project.server.main.tasks.step2(1) (3736909c-f05d-4160-9a76-01bb1b18db58)
worker_2 | --> [2019-11-04 14:44:57.341133] POLL JOB [ 2 ] STARTED...
worker_1 | *** --> [2019-11-04 14:44:57.342142] JOB [ 1 ] STARTED...
...
job2 isне дожидаясьr job1 для завершения ...
#requirements.txt
Flask==1.0.2
Flask-Bootstrap==3.3.7.1
Flask-Testing==0.7.1
Flask-WTF==0.14.2
redis==3.3.11
rq==0.13
rq_scheduler==0.9.1