Воздушный поток Python Operator с.тип возврата - PullRequest
0 голосов
/ 15 февраля 2019

У меня есть оператор python в моей DAG.Вызываемая функция Python возвращает значение bool.Но когда я запускаю DAG, я получаю ошибку ниже.

TypeError: объект 'bool' не вызывается

Я изменил функцию, чтобы ничего не возвращать, но потом снова получаю сообщение об ошибке ниже

ОШИБКА - объект 'NoneType' не может быть вызван

Ниже мой dag

def check_poke(threshold,sleep_interval):
flag=snowflake_poke(1000,10).poke()
#print(flag)
return flag

dependency = PythonOperator(
task_id='poke_check',
#python_callable=check_poke(129600,600),
provide_context=True,
python_callable=check_poke(129600,600),
dag=dag)

end = BatchEndOperator(
queue=QUEUE,
dag=dag)

start.set_downstream(dependency)
dependency.set_downstream(end)

Невозможно понять, что я пропускаю.Может кто-нибудь помочь мне в этом ... Совершенно новичок в воздушном потоке.

Я отредактировал оператор python в теге, как показано ниже

dependency = PythonOperator(
task_id='poke_check',
provide_context=True,
python_callable=check_poke(129600,600),
dag=dag)

Но теперь я получаю другую ошибку.

Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1245, in run
    result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/operators/python_operator.py", line 66, in execute
    return_value = self.python_callable(*self.op_args, **self.op_kwargs)
TypeError: () takes no arguments (25 given)
[2019-02-15 05:30:25,375] {models.py:1298} INFO - Marking task as UP_FOR_RETRY
[2019-02-15 05:30:25,393] {models.py:1327} ERROR - () takes no arguments (25 given)

Ответы [ 2 ]

0 голосов
/ 15 февраля 2019

Согласны с @ Даном Д. по вопросу;но это сбивает с толку, почему его решение не сработало (оно, безусловно, работает в python shell )

Посмотрите, найдет ли это вам какую-то удачу (это просто многословный вариант @ DanD. решение)

from typing import Callable

# your original check_poke function
def check_poke(arg_1: int, arg_2: int) -> bool:
    # do something
    # somehow returns a bool
    return arg_1 < arg_2

# a function that returns a callable, that in turn invokes check_poke
# with the supplied params
def check_poke_wrapper_creator(arg_1: int, arg_2: int) -> Callable[[], bool]:
    def check_poke_wrapper() -> bool:
        return check_poke(arg_1=arg_1, arg_2=arg_2)

    return check_poke_wrapper

..

# usage
python_callable=check_poke_wrapper_creator(129600, 600)
0 голосов
/ 15 февраля 2019

Имя аргумента выдает его.Вы передаете результат вызова, а не вызываемый.

python_callable=check_poke(129600,600)

Вторая ошибка гласит, что вызываемый вызывается с 25 аргументами.Так что lambda: не будет работать.Следующее будет работать, но игнорирование 25 аргументов действительно сомнительно.

python_callable=lambda *args, **kwargs: check_poke(129600,600)
...