Есть много способов сделать это с помощью Airflow и ваших микросервисов.В общем, вы захотите использовать датчик, это подходящий объект Airflow для чего-то вроде этого.Начните с проверки BaseSensorOperator и около операторов .В воздушном потоке датчики используются так же, как операторы (датчики являются операторами).Таким образом, вы можете создать работу следующим образом:
http_post_task -> http_sensor_task -> success_task
Где http_post_task будет запускать работу, http_sensor_task будет периодически проверять, выполнена ли работа (например, GET запрашивает микросервис и может проверять 200, может быть?)и success_task будет выполняться после успешного выполнения http_sensor_task.
Ваш http_sensor_task должен быть вашим собственным датчиком.Вот некоторый код sudo, который может помочь вам создать этот датчик (помните, что датчики используются как операторы).Рассмотрим случай, когда вы отправляете запрос в микросервис, а затем делаете еще один запрос для проверки статуса задания (запрос GET и проверка 200), вы расширите тип BaseSensorOperator следующим образом:
from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from time import sleep
import requests
class HTTPSensorOperator(BaseSensorOperator):
"""
Pokes a URL until it returns 200
"""
ui_color = '#000000'
@apply_defaults
def __init__( self, url, *args, **kwargs):
super(HTTPSensorOperator, self).__init__(*args, **kwargs)
self.url = url
def poke(self, context):
"""
GET request url and return True if response is 200, False otherwise
"""
r = requests.post(self.url)
if r.status_code == 200:
return True
else:
return False
def execute(self, context):
"""
Check the url and wait for it to return 200.
"""
started_at = datetime.utcnow()
while not self.poke(context):
if (datetime.utcnow() - started_at).total_seconds() > self.timeout:
if self.soft_fail:
raise AirflowSkipException("Exporting {0}/{1} took to long.".format(self.project, self.instance))
else:
raise AirflowSkipException("Exporting {0}/{1} took to long.".format(self.project, self.instance))
sleep(self.poke_interval)
self.log.info("Success criteria met. Exiting.")
Затем воспользуйтесь оператором, например:
http_sensor_task = HTTPSensorOperator(
task_id="http_sensor_task",
url="http://localhost/check_job?job_id=1",
timeout=3600, # 1 hour
dag=dag
)
Таким образом, вам придется решить, как ваши микросервисы будут взаимодействовать с Airflow.Мне кажется, что вы сделаете 1 запрос на запуск задания, а затем сделаете последующий запрос (возможно, через 10 секунд) для проверки задания.Удачи!