Я работаю над небольшим доказательством концепции Airflow в Google Cloud.
По сути, я хочу создать рабочий процесс, который будет загружать данные из REST API (https), преобразовывать эти данные в формат JSON и загружать их в хранилище Google Cloud.
У меня естьуже сделал это с чистым кодом Python, и это работает. Довольно просто! Но так как я хочу запланировать это и есть некоторые зависимости, Airflow должен быть идеальным инструментом для этого.
После тщательного прочтения документации Airflow я увидел, что HttpOperator и / или HttpHook могут справиться с задачейдля части загрузки.
Я создал свое соединение Http в WebUI с моим адресом электронной почты / паролем для авторизации следующим образом:
{Conn Id: "atlassian_marketplace", Conn Type:"HTTP", хост: "https://marketplace.atlassian.com/rest/2", Схема: нет / пусто, логин:" мое имя пользователя ", пароль:" мой пароль ", порт: нет / пусто, дополнительно: нет / пусто}
Первый вопрос: -Когда использовать SimpleHttpOperator против HttpHook?
Второй вопрос: -Как мы используем SimpleHttpOperator или HttpHook с вызовами HTTP?
Третий вопрос: -Как мы получаем доступ кданные, возвращаемые вызовом API?
В моем случае функция XCOM не сработает, потому что эти вызовы API могут вернуть много данных (100-300 МБ)!
посмотрите в Google, чтобы найти бывшегоДостаточно кода о том, как использовать оператор / ловушку для моего варианта использования, но я пока не нашел ничего полезного.
Есть идеи?
Я поместил здесь скелет моего кода до сих пор.
# Usual Airflow import
# Dag creation
dag = DAG(
'get_reporting_links',
default_args=default_args,
description='Get reporting links',
schedule_interval=timedelta(days=1))
# Task 1: Dummy start
start = DummyOperator(task_id="Start", retries=2, dag=dag)
# Task 2: Connect to Atlassian Marketplace
get_data = SimpleHttpOperator(
http_conn_id="atlassian_marketplace",
endpoint="/vendors/{vendorId}/reporting".format({vendorId: "some number"}),
method="GET")
# Task 3: Save JSON data locally
# TODO: transform_json: transform to JSON get_data.json()?
# Task 4: Upload data to GCP
# TODO: upload_gcs: use Airflow GCS connection
# Task 5: Stop
stop = DummyOperator(task_id="Stop", retries=2, dag=dag)
# Dependencies
start >> get_data >> transform_json >> upload_gcs >> stop