Параметры потока, передаваемые оператору Python - PullRequest
0 голосов
/ 16 февраля 2019

Я пытаюсь написать оператор Python в DAG воздушного потока и передать определенные параметры вызываемому Python.

Мой код выглядит следующим образом.

def my_sleeping_function(threshold):
   print(threshold)

fmfdependency = PythonOperator(
   task_id='poke_check',
   python_callable=my_sleeping_function,
   provide_context=True,
   op_kwargs={'threshold': 100},
   dag=dag)

end = BatchEndOperator(
   queue=QUEUE,
   dag=dag)

start.set_downstream(fmfdependency)
fmfdependency.set_downstream(end)

Но я получаю сообщение об ошибке ниже.

TypeError: my_sleeping_function () получил неожиданный аргумент ключевого слова 'dag_run'

Невозможно выяснить, почему.

Ответы [ 2 ]

0 голосов
/ 16 июля 2019

Так вы можете передавать аргументы для оператора Python в Airflow.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

from time import sleep
from datetime import datetime

def my_func(*op_args):
        print(op_args)
        return op_args[0]

with DAG('python_dag', description='Python DAG', schedule_interval='*/5 * * * *', start_date=datetime(2018, 11, 1), catchup=False) as dag:
        dummy_task      = DummyOperator(task_id='dummy_task', retries=3)
        python_task     = PythonOperator(task_id='python_task', python_callable=my_func, op_args=['one', 'two', 'three'])

        dummy_task >> python_task
0 голосов
/ 16 февраля 2019

Добавьте ** kwargs в список параметров вашего оператора после вашего порогового параметра

...