HttpOperator или HttpHook для HTTPS в потоке воздуха - PullRequest
1 голос
/ 03 октября 2019

Я работаю над небольшим доказательством концепции Airflow в Google Cloud.

По сути, я хочу создать рабочий процесс, который будет загружать данные из REST API (https), преобразовывать эти данные в формат JSON и загружать их в хранилище Google Cloud.

У меня естьуже сделал это с чистым кодом Python, и это работает. Довольно просто! Но так как я хочу запланировать это и есть некоторые зависимости, Airflow должен быть идеальным инструментом для этого.

После тщательного прочтения документации Airflow я увидел, что HttpOperator и / или HttpHook могут справиться с задачейдля части загрузки.

Я создал свое соединение Http в WebUI с моим адресом электронной почты / паролем для авторизации следующим образом:

{Conn Id: "atlassian_marketplace", Conn Type:"HTTP", хост: "https://marketplace.atlassian.com/rest/2", Схема: нет / пусто, логин:" мое имя пользователя ", пароль:" мой пароль ", порт: нет / пусто, дополнительно: нет / пусто}

Первый вопрос: -Когда использовать SimpleHttpOperator против HttpHook?

Второй вопрос: -Как мы используем SimpleHttpOperator или HttpHook с вызовами HTTP?

Третий вопрос: -Как мы получаем доступ кданные, возвращаемые вызовом API?

В моем случае функция XCOM не сработает, потому что эти вызовы API могут вернуть много данных (100-300 МБ)!

посмотрите в Google, чтобы найти бывшегоДостаточно кода о том, как использовать оператор / ловушку для моего варианта использования, но я пока не нашел ничего полезного.

Есть идеи?

Я поместил здесь скелет моего кода до сих пор.

# Usual Airflow import

# Dag creation
dag = DAG(
    'get_reporting_links',
    default_args=default_args,
    description='Get reporting links',
    schedule_interval=timedelta(days=1))

# Task 1: Dummy start
start = DummyOperator(task_id="Start", retries=2, dag=dag)

# Task 2: Connect to Atlassian Marketplace
get_data = SimpleHttpOperator(
            http_conn_id="atlassian_marketplace",
          endpoint="/vendors/{vendorId}/reporting".format({vendorId: "some number"}), 
            method="GET")

# Task 3: Save JSON data locally
# TODO: transform_json: transform to JSON get_data.json()?

# Task 4: Upload data to GCP
# TODO: upload_gcs: use Airflow GCS connection

# Task 5: Stop
stop = DummyOperator(task_id="Stop", retries=2, dag=dag)

# Dependencies
start >> get_data >> transform_json >> upload_gcs >> stop

1 Ответ

0 голосов
/ 04 октября 2019

Посмотрите на следующий пример:

# Usual Airflow import

# Dag creation
dag = DAG(
    'get_reporting_links',
    default_args=default_args,
    description='Get reporting links',
    schedule_interval=timedelta(days=1))

# Task 1: Dummy start
start = DummyOperator(task_id="Start", retries=2, dag=dag)

# Task 2: Connect to Atlassian Marketplace
get_data = SimpleHttpOperator(
     task_id="get_data",
     http_conn_id="atlassian_marketplace",
     endpoint="/vendors/{vendorId}/reporting".format({vendorId: "some number"}), 
     method="GET",
     xcom_push=True,
)

def transform_json(**kwargs):
    ti = kwargs['ti']
    pulled_value_1 = ti.xcom_pull(key=None, task_ids='get_data')
    ...
    # transform the json here and save the content to a file


# Task 3: Save JSON data locally
save_and_transform = PythonOperator(
    task_id="save_and_transform", 
    python_callable=transform_json,
    provide_context=True,
)

# Task 4: Upload data to GCP
upload_to_gcs = FileToGoogleCloudStorageOperator(...)

# Task 5: Stop
stop = DummyOperator(task_id="Stop", retries=2, dag=dag)

# Dependencies
start >> get_data >> save_and_transform >> upload_to_gcs >> stop
...