У меня есть свой собственный пользовательский оператор, расширяющий BaseOperator следующим образом.
Я пытался убить задачу, если она длится более 30 минут.
Тайм-аут, похоже, срабатывает в соответствии с журналом, но задача все еще продолжается.
Я что-то упустил? Я проверил официальный документ, но не знаю, что не так.
https://airflow.apache.org/code.html#baseoperator
Мой оператор выглядит следующим образом.
class MyOperator(BaseOperator):
@apply_defaults
def __init__(
self,
some_parameters_here,
*args,
**kwargs):
*args,
**kwargs):
super(MyOperator, self).__init__(*args, **kwargs)
# some initialization here
def execute(self, context):
# some code here
Моя задача выглядит следующим образом.
t = MyOperator(
task_id='task',
dag=scheduled_dag,
execution_timeout=timedelta(minutes=30)
Я нашел эту ошибку, но задача продолжилась.
[2018-04-12 03:30:28,353] {base_task_runner.py:98} INFO - Subtask: [Stage 6:==================================================(1380 + -160) / 1224][2018-04-
12 03:30:28,353] {timeout.py:36} ERROR - Process timed out
Ref.
https://issues.apache.org/jira/browse/AIRFLOW-2385