воздушный поток общаться между задачами без xcom - PullRequest
1 голос
/ 18 февраля 2020

Я выяснил, что xcom фактически записывает данные в базу данных и извлекает их из другой задачи. У меня большой набор данных, его перебирают и записывают в базу данных, что вызывает некоторые ненужные задержки. Есть ли способ передачи данных между задачами в одном и том же воздушном потоке Dag без использования xcom?

ниже приведен код, который я пробовал, контекст фактически не передается. Я знаю, что использование task_instance.xcom_push() будет работать, но оно также будет обрабатывать данные и записывать их в базу данных, которая мне не нужна.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from pandas import DataFrame
import pandas as pd
from custom.dataframe_to_postgres_operator import PostgresOperatorBulk
from airflow.operators.postgres_operator import PostgresOperator

def read_df(task_instance, **context):
    df = pd.read_parquet('/usr/local/airflow/data/df.parquet.gzip')
    print(df)
    # task_instance.xcom_push('data', df)
    context.update({'data': df})
    for k, v in context.items():
        print(k, v)
    return 1

def get_df(task_instance, **context):
    for k, v in context.items():
        print(k, v)
    df = context['data']

default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 2, 17),
    'retries': 0,
}

dag = DAG('abcdefg', default_args=default_args, schedule_interval=timedelta(days=1))

task_read_df = PythonOperator(
    task_id='read_df',
    python_callable=read_df,
    dag=dag,
    provide_context=True,
    do_xcom_push=False
)

task_get_df = PythonOperator(
    task_id='get_df',
    python_callable=get_df,
    dag=dag,
    provide_context=True,
    do_xcom_push=False
)

task_read_df >> task_get_df

1 Ответ

0 голосов
/ 18 февраля 2020

Если у вас есть большой набор данных, который вы хотите обменять, я предлагаю сохранить данные в какой-либо форме во временном местоположении (например, в указанном каталоге), а затем передать путь к такому временному файлу или файлам, используя XCOM (который для небольших фрагментов данных это дешево и дает достаточно хорошую производительность).

Для этого хорошая библиотека - tempfile, которая помогает облегчить боль, чтобы избежать дублирования между временными файлами.

Почему XCOM, а не общий контекст выполнения между Задачами

Учитывая, что Задачи могут быть выполнены в параллельно , это первая проблема, которая создает большие трудности для Python (GIL, параллельное совместное использование данных).

Во-вторых, чтобы обеспечить некоторую устойчивость (и, следовательно, устойчивость к сбоям), вы должны использовать базу данных для обеспечения ACID .

Все это делает механизм XCOM относительно тяжелым (особенно когда вы добавляете травление поверх него), но он универсален.

Имея все это в виду, вы должны помнить использование временных файлов, к которым вы передаете путь через XCOM, обеспечивает тот же уровень устойчивости, что и сам XCOM (особенно, если файл хранится на RAM-диске). Он также не поддерживает воспроизведение задач, если вы не храните временные файлы неопределенно долго.

...