Как проверить код ответа HTTP от Airflow SimpleHttpOperator? - PullRequest
0 голосов
/ 25 января 2020

Я пытаюсь получить код ответа HTTP от сработавшего Airflow SimpleHttpOperator. Я видел примеры с использованием типа «лямбда», и я делаю это, просматривая текст ответа, но я надеялся, что смогу передать ответ code функции. Мой текущий код (который составляет 90% от example_http_operator):

import json
from datetime import timedelta

from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.sensors.http_sensor import HttpSensor
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['me@company.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
}

dag = DAG(dag_id='kick_off_java_task', default_args=default_args)

kickoff_task = SimpleHttpOperator(
     task_id='kick_off_c2c_java_task',
     http_conn_id='test',
     method='GET',
     endpoint='',
     data={ "command": "run" },
     response_check=lambda response: True if "Ok Message" in response.text else False,
     headers={},
     xcom_push=False,
     dag=dag
)

Согласно документации и коду, кажется, что есть способ указать response_check на вызываемый объект, но мне не ясен синтаксис, или если мне нужно идти в совершенно другом направлении, например, используя xcom.

1 Ответ

1 голос
/ 27 января 2020

После небольшой проб и ошибок решение оказывается довольно простым:

dag = DAG(dag_id='kick_off_java_task', default_args=default_args)

def check(response):
    if response == 200:
        print("Returning True")
        return True
    else:
        print("Returning False")
        return False

kickoff_task = SimpleHttpOperator(
     task_id='kick_off_c2c_java_task',
     http_conn_id='c2c_test',
     method='GET',
     endpoint='',
     data={ "command": "run" },
     response_check=lambda response: True if check(response.status_code) is True else False,
     headers={},
     xcom_push=False,
     dag=dag
)

с функцией "check" python, определенной до ее использования в лямбде, я могу передать параметр "response.status_code" для этой функции.

...