Воздушный поток on_success_callback () не запускается - PullRequest
0 голосов
/ 28 декабря 2018

Я запускаю конвейер, используя Airflow, который содержит несколько операторов Bash, которые должны быть выполнены.
Каждый оператор имеет атрибуты on_failure_callback и on_success_callback, которые вызывают функцию для отправки электронного письма со статусом задачи (успех /сбой) и загружает сгенерированный файл журнала из каталога в hdfs.В следующих фрагментах кода показан пример оператора, который я использую, и вызываемой функции.

  • Оператор Bash :

op =BashOperator (task_id = 'test_op', bash_command = 'python3 run.py', on_failure_callback = fail_email, on_success_callback = success_email, повторы = 3, dag = dag)

  • success_email :
def success_email(contextDict,**kwargs):
    """Send custom email alerts."""

    # email title.
    title = "Task {} SUCCEEDED. Execution date: {}".format(contextDict['task'].task_id, contextDict['execution_date'])


    # email contents
    body = """

         <br>
         The correspondent log file:
         <br>
         {}
          """.format(hdfs_log)

    print("Uploading log to hdfs")
    subprocess.check_call(["hdfs", "dfs", "-mkdir", "-p", hdfs_log_folder])
    subprocess.check_call(["hdfs", "dfs", "-put", local_log, hdfs_log])
    send_email('email@domain.com', title, html_content=body)

Параметр success_callback всегда вызывает сбой при вызове команд hdfs и выдает следующую ошибку:

[2018-12-28 09:13:29,727] INFO - Uploading log to hdfs

[2018-12-28 09:13:30,344] INFO - [2018-12-28 09:13:30,342] WARNING - State of this instance has been externally set to success. Taking the poison pill.

[2018-12-28 09:13:30,381] INFO - Sending Signals.SIGTERM to GPID 11515
[2018-12-28 09:13:30,382] ERROR - Received SIGTERM. Terminating subprocesses.
[2018-12-28 09:13:30,382] INFO - Sending SIGTERM signal to bash process group
[2018-12-28 09:13:30,390] ERROR - Failed when executing success callback
[2018-12-28 09:13:30,390] ERROR - [Errno 3] No such process
Traceback (most recent call last):
  File "/opt/hadoop/airflow/python/lib/python3.6/site-packages/airflow/models.py", line 1687, in _run_raw_task
    task.on_success_callback(context)
  File "/usr/local/airflow/dags/Appl_FUMA.py", line 139, in success_email
    subprocess.check_call(["hdfs", "dfs", "-mkdir", "-p", hdfs_log_folder])
  File "/usr/lib64/python3.6/subprocess.py", line 286, in check_call
    retcode = call(*popenargs, **kwargs)
  File "/usr/lib64/python3.6/subprocess.py", line 269, in call
    return p.wait(timeout=timeout)
  File "/usr/lib64/python3.6/subprocess.py", line 1457, in wait
    (pid, sts) = self._try_wait(0)
  File "/usr/lib64/python3.6/subprocess.py", line 1404, in _try_wait
    (pid, sts) = os.waitpid(self.pid, wait_flags)
  File "/opt/hadoop/airflow/python/lib/python3.6/site-packages/airflow/models.py", line 1611, in signal_handler
    task_copy.on_kill()
  File "/opt/hadoop/airflow/python/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 125, in on_kill
    os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM)
ProcessLookupError: [Errno 3] No such process
[2018-12-28 09:13:30,514] INFO - Process psutil.Process(pid=11515 (terminated)) (11515) terminated with exit code 0
[2018-12-28 09:13:30,514] INFO - Process psutil.Process(pid=20649 (terminated)) (20649) terminated with exit code None
[2018-12-28 09:13:30,514] INFO - Process psutil.Process(pid=11530 (terminated)) (11530) terminated with exit code None
[2018-12-28 09:13:30,515] INFO - [2018-12-28 09:13:30,515] INFO - Task exited with return code 0

Однако этоудается посылать электронные письма (иногда), когда я закомментирую две строки подпроцессов.Есть идеи, как решить эту проблему?

...