Как исправить ошибку воздушного потока: print_context () отсутствует 1 обязательный позиционный аргумент: 'ds' - PullRequest
0 голосов
/ 26 ноября 2018

У меня есть метка, как показано ниже: ingest_excel.py:

from __future__ import print_function

import time
from builtins import range
from datetime import timedelta
from pprint import pprint

import airflow
from airflow.models import DAG
#from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

args = {
    'owner': 'rxie',
    'start_date': airflow.utils.dates.days_ago(2),
}

dag = DAG(
    dag_id='ingest_excel',
    default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60),
)

def print_context(**kwargs):
    pprint("DAG info below:")
    pprint(kwargs)
    return 'Whatever you return gets printed in the logs'


t11_extract_excel_to_csv = PythonOperator(
    task_id='t1_extract_excel_to_csv',
    provide_context=True,
    python_callable=print_context(),
    op_kwargs=None,
    dag=dag,
)


t12_upload_csv_to_hdfs_parquet = PythonOperator(
    task_id='t12_upload_csv_to_hdfs_parquet',
    provide_context=True,
    python_callable=print_context(),
    op_kwargs=None,
    dag=dag,
)


t13_register_parquet_to_impala = PythonOperator(
    task_id='t13_register_parquet_to_impala',
    provide_context=True,
    python_callable=print_context(),
    op_kwargs=None,
    dag=dag,
)

t21_text_to_parquet = PythonOperator(
    task_id='t21_text_to_parquet',
    provide_context=True,
    python_callable=print_context(),
    op_kwargs=None,
    dag=dag,
)

t22_register_parquet_to_impala = PythonOperator(
    task_id='t22_register_parquet_to_impala',
    provide_context=True,
    python_callable=print_context(),
    op_kwargs=None,
    dag=dag,
)

t31_verify_completion = PythonOperator(
    task_id='t31_verify_completion',
    provide_context=True,
    python_callable=print_context(),
    op_kwargs=None,
    dag=dag,
)

t32_send_notification = PythonOperator(
    task_id='t32_send_notification',
    provide_context=True,
    python_callable=print_context(),
    op_kwargs=None,
    dag=dag,
)

t11_extract_excel_to_csv >> t12_upload_csv_to_hdfs_parquet
t12_upload_csv_to_hdfs_parquet >> t13_register_parquet_to_impala

t21_text_to_parquet >> t22_register_parquet_to_impala


t13_register_parquet_to_impala >> t31_verify_completion
t22_register_parquet_to_impala >> t31_verify_completion

t31_verify_completion >> t32_send_notification


#if __name__ == "__main__":
#    dag.cli()

В графическом интерфейсе DAG он запрашивает:

Сломанный DAG: [/ root / airflow / dags / ingest_excel.py] python_callable param должен быть вызванза меня.

Заранее спасибо.

Ответы [ 2 ]

0 голосов
/ 26 ноября 2018

Чтобы уточнить вашу проблему: ваш процесс не работает, потому что вы не передаете функцию print_context в PythonOperator, вы передаете результат вызова print_context:

[...]

t32_send_notification = PythonOperator(
    task_id='t32_send_notification',
    provide_context=True,
    python_callable=print_context(), # <-- This is the issue.
    op_kwargs=None,
    dag=dag,
)

[...]

Ваша функция возвращает строку 'Whatever you return gets printed in the logs', которая, в свою очередь, предоставляется PythonOperator в аргументе ключевого слова python_callable.Airflow пытается выполнить следующее:

your_return = 'Whatever you return gets printed in the logs'
your_return()

... и вы получаете сообщение об ошибке.Другой участник правильно сказал, что вы должны изменить аргумент ключевого слова PythonOperator.python_callable на просто print_context

0 голосов
/ 26 ноября 2018

Я не совсем уверен, почему ваш код не работает.Это должно работать, но обходной путь дан ниже.

def print_context(**kwargs):
ds = kwargs['ds']

также python_callable должен передаваться следующим образом

python_callable=print_context,
...