Задача воздушного потока не останавливается, когда срабатывает тайм-аут выполнения - PullRequest
0 голосов
/ 27 апреля 2018

У меня есть свой собственный пользовательский оператор, расширяющий BaseOperator следующим образом. Я пытался убить задачу, если она длится более 30 минут. Тайм-аут, похоже, срабатывает в соответствии с журналом, но задача все еще продолжается.

Я что-то упустил? Я проверил официальный документ, но не знаю, что не так. https://airflow.apache.org/code.html#baseoperator

Мой оператор выглядит следующим образом.

class MyOperator(BaseOperator):
    @apply_defaults
    def __init__(
      self,
      some_parameters_here,
      *args,
      **kwargs):
      *args,
      **kwargs):
      super(MyOperator, self).__init__(*args, **kwargs)
      # some initialization here

    def execute(self, context):
      # some code here

Моя задача выглядит следующим образом.

t = MyOperator(
     task_id='task',
     dag=scheduled_dag,
     execution_timeout=timedelta(minutes=30)

Я нашел эту ошибку, но задача продолжилась.

[2018-04-12 03:30:28,353] {base_task_runner.py:98} INFO - Subtask: [Stage 6:==================================================(1380 + -160) / 1224][2018-04- 
12 03:30:28,353] {timeout.py:36} ERROR - Process timed out

Ref. https://issues.apache.org/jira/browse/AIRFLOW-2385

1 Ответ

0 голосов
/ 22 мая 2018

Это задание воздушного потока запускает задание Spark. Итак, мне нужно остановить сеанс Spark при вызове метода on_kill по таймауту.

def on_kill(self):
    if (self.spark):
        logging.error('on_kill: stopping spark session...')
        self.spark.stop()
...