Я запускаю простой http-датчик в Airflow:
from airflow import DAG
from airflow.sensors.http_sensor import HttpSensor
from airflow.contrib.sensors.file_sensor import FileSensor
from datetime import datetime, timedelta
default_args = {
"owner": "airflow",
"start_date": datetime(2019, 1, 1),
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"email": "youremail@host.com",
"retries": 1,
"retry_delay": timedelta(minutes=5)
}
with DAG(dag_id="forex_data_pipeline_v_5", schedule_interval="@daily", default_args=default_args, catchup=False) as dag:
is_forex_rates_available = HttpSensor(
task_id="is_forex_rates_available",
method="GET",
http_conn_id="http_default",
endpoint="latest",
response_check=lambda response: "rates" in response.text,
poke_interval=5,
timeout=20
)
Это поле http_conn_id="forex_api",
вызывает у меня проблемы. Я добавил HTTP-соединение под названием «forex_api», установил тип HTTP и дал URL-адрес. Я продолжаю получать следующую ошибку:
airflow.exceptions.AirflowException: The conn_id `forex_api` isn't defined
Я знаю, что это определено, потому что я вижу это в пользовательском интерфейсе. Я взял уже определенный http_default
и попытался обновить конечную точку, и обновление не отображается в airflow test
, с которым я работаю. Это говорит мне о том, что база данных не синхронизируется, как я ожидаю. Я запускаю это в Docker, и я установил fernet_key
глобально. Что может происходить?