Как обработать сбой задачи в префекте и вернуть SUCCESS с параметром on_failure? - PullRequest
1 голос
/ 28 января 2020

У меня есть Flow in prefect, чей task, чей вывод dataframe. В приведенном ниже примере всегда происходит сбой. Я бы хотел, чтобы task вернул пустое dataframe с состоянием SUCCESS, используя @task(on_failure=handle_task_fail). Какой правильный синтаксис для этого?

1 Ответ

2 голосов
/ 28 января 2020

Здесь происходит две вещи:

1.) Generi c Обработчики состояний: они могут быть установлены через state_handlers kwarg и будут вызываться в каждом состоянии. изменение. Обработчик состояния должен иметь подпись state_handler(task: Task, old_state: State, new_state: State) -> Optional[State] (которая является используемой вами подписью); состояние Задачи после вызова этого обработчика будет состоянием, которое возвращается из обработчика, или new_state, если возвращается None.

2.) Обратные вызовы при сбое: on_failure kwarg, который вы использование здесь предназначено для удобства API для обработчиков состояния; функции, которые передаются этому ключевому слову, должны иметь подпись fn(task: Task, state: State) -> None и будут вызываться только тогда, когда эта задача переходит в состояние Failed. Обратите внимание, что при сбое обратные вызовы не могут изменить состояние Задачи так, как это могут делать обработчики состояний.

В вашем примере вы, похоже, смешиваете два аргумента ключевого слова. Я полагаю, что следующий код сделает то, что вы ожидаете:

from prefect.engine.state import Success


def handle_disambig_error(task, old_state, new_state):
    if new_state.is_failed():
        return_state = Success(result=pd.DataFrame())
    else:
        return_state = new_state
    return return_state

@task(state_handlers=[handle_disambig_error])
def get_wiki_resource():
   return df
...