Здесь происходит две вещи:
1.) Generi c Обработчики состояний: они могут быть установлены через state_handlers
kwarg и будут вызываться в каждом состоянии. изменение. Обработчик состояния должен иметь подпись state_handler(task: Task, old_state: State, new_state: State) -> Optional[State]
(которая является используемой вами подписью); состояние Задачи после вызова этого обработчика будет состоянием, которое возвращается из обработчика, или new_state
, если возвращается None
.
2.) Обратные вызовы при сбое: on_failure
kwarg, который вы использование здесь предназначено для удобства API для обработчиков состояния; функции, которые передаются этому ключевому слову, должны иметь подпись fn(task: Task, state: State) -> None
и будут вызываться только тогда, когда эта задача переходит в состояние Failed
. Обратите внимание, что при сбое обратные вызовы не могут изменить состояние Задачи так, как это могут делать обработчики состояний.
В вашем примере вы, похоже, смешиваете два аргумента ключевого слова. Я полагаю, что следующий код сделает то, что вы ожидаете:
from prefect.engine.state import Success
def handle_disambig_error(task, old_state, new_state):
if new_state.is_failed():
return_state = Success(result=pd.DataFrame())
else:
return_state = new_state
return return_state
@task(state_handlers=[handle_disambig_error])
def get_wiki_resource():
return df