Как проверить состояние долгосрочной задачи http с потоком воздуха? - PullRequest
0 голосов
/ 01 декабря 2018

Мой пример использования - управление большим количеством запланированных заданий на микро-сервисах с использованием воздушного потока.Решение, которое я пытаюсь использовать, состоит в том, чтобы использовать поток воздуха в качестве централизованного планировщика заданий и запускать задания, совершая http-вызовы.Некоторые из этих заданий будут выполняться долго, например.более 10 минут или до 1 часа.

Как я могу регулярно проверять состояние этих заданий по воздушному потоку?Что если удаленная задача завершена, но поток воздуха не знает об успешной работе?Могу ли я опубликовать событие для завершения задания в kafka и заставить прослушивание потока воздуха на kafka для получения статуса задания?

1 Ответ

0 голосов
/ 02 декабря 2018

Есть много способов сделать это с помощью Airflow и ваших микросервисов.В общем, вы захотите использовать датчик, это подходящий объект Airflow для чего-то вроде этого.Начните с проверки BaseSensorOperator и около операторов .В воздушном потоке датчики используются так же, как операторы (датчики являются операторами).Таким образом, вы можете создать работу следующим образом:

http_post_task -> http_sensor_task -> success_task

Где http_post_task будет запускать работу, http_sensor_task будет периодически проверять, выполнена ли работа (например, GET запрашивает микросервис и может проверять 200, может быть?)и success_task будет выполняться после успешного выполнения http_sensor_task.

Ваш http_sensor_task должен быть вашим собственным датчиком.Вот некоторый код sudo, который может помочь вам создать этот датчик (помните, что датчики используются как операторы).Рассмотрим случай, когда вы отправляете запрос в микросервис, а затем делаете еще один запрос для проверки статуса задания (запрос GET и проверка 200), вы расширите тип BaseSensorOperator следующим образом:

from airflow.operators.sensors import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from time import sleep
import requests

class HTTPSensorOperator(BaseSensorOperator): 
    """
    Pokes a URL until it returns 200
    """
    ui_color = '#000000'
    @apply_defaults
    def __init__( self, url, *args, **kwargs):
        super(HTTPSensorOperator, self).__init__(*args, **kwargs)
        self.url = url


    def poke(self, context):
        """
        GET request url and return True if response is 200, False otherwise
        """
        r = requests.post(self.url)
        if r.status_code == 200:
            return True
        else:
            return False

    def execute(self, context):
        """
        Check the url and wait for it to return 200.
        """
        started_at = datetime.utcnow()
        while not self.poke(context):
            if (datetime.utcnow() - started_at).total_seconds() > self.timeout:
                if self.soft_fail:
                    raise AirflowSkipException("Exporting {0}/{1} took to long.".format(self.project, self.instance))
                else:
                    raise AirflowSkipException("Exporting {0}/{1} took to long.".format(self.project, self.instance))
            sleep(self.poke_interval)
        self.log.info("Success criteria met. Exiting.")

Затем воспользуйтесь оператором, например:

http_sensor_task = HTTPSensorOperator(
      task_id="http_sensor_task",
      url="http://localhost/check_job?job_id=1",
      timeout=3600, # 1 hour
      dag=dag
   )

Таким образом, вам придется решить, как ваши микросервисы будут взаимодействовать с Airflow.Мне кажется, что вы сделаете 1 запрос на запуск задания, а затем сделаете последующий запрос (возможно, через 10 секунд) для проверки задания.Удачи!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...