Я запускаю конвейер, используя Airflow, который содержит несколько операторов Bash, которые должны быть выполнены.
Каждый оператор имеет атрибуты on_failure_callback
и on_success_callback
, которые вызывают функцию для отправки электронного письма со статусом задачи (успех /сбой) и загружает сгенерированный файл журнала из каталога в hdfs.В следующих фрагментах кода показан пример оператора, который я использую, и вызываемой функции.
op =BashOperator (task_id = 'test_op', bash_command = 'python3 run.py', on_failure_callback = fail_email, on_success_callback = success_email, повторы = 3, dag = dag)
def success_email(contextDict,**kwargs):
"""Send custom email alerts."""
# email title.
title = "Task {} SUCCEEDED. Execution date: {}".format(contextDict['task'].task_id, contextDict['execution_date'])
# email contents
body = """
<br>
The correspondent log file:
<br>
{}
""".format(hdfs_log)
print("Uploading log to hdfs")
subprocess.check_call(["hdfs", "dfs", "-mkdir", "-p", hdfs_log_folder])
subprocess.check_call(["hdfs", "dfs", "-put", local_log, hdfs_log])
send_email('email@domain.com', title, html_content=body)
Параметр success_callback
всегда вызывает сбой при вызове команд hdfs и выдает следующую ошибку:
[2018-12-28 09:13:29,727] INFO - Uploading log to hdfs
[2018-12-28 09:13:30,344] INFO - [2018-12-28 09:13:30,342] WARNING - State of this instance has been externally set to success. Taking the poison pill.
[2018-12-28 09:13:30,381] INFO - Sending Signals.SIGTERM to GPID 11515
[2018-12-28 09:13:30,382] ERROR - Received SIGTERM. Terminating subprocesses.
[2018-12-28 09:13:30,382] INFO - Sending SIGTERM signal to bash process group
[2018-12-28 09:13:30,390] ERROR - Failed when executing success callback
[2018-12-28 09:13:30,390] ERROR - [Errno 3] No such process
Traceback (most recent call last):
File "/opt/hadoop/airflow/python/lib/python3.6/site-packages/airflow/models.py", line 1687, in _run_raw_task
task.on_success_callback(context)
File "/usr/local/airflow/dags/Appl_FUMA.py", line 139, in success_email
subprocess.check_call(["hdfs", "dfs", "-mkdir", "-p", hdfs_log_folder])
File "/usr/lib64/python3.6/subprocess.py", line 286, in check_call
retcode = call(*popenargs, **kwargs)
File "/usr/lib64/python3.6/subprocess.py", line 269, in call
return p.wait(timeout=timeout)
File "/usr/lib64/python3.6/subprocess.py", line 1457, in wait
(pid, sts) = self._try_wait(0)
File "/usr/lib64/python3.6/subprocess.py", line 1404, in _try_wait
(pid, sts) = os.waitpid(self.pid, wait_flags)
File "/opt/hadoop/airflow/python/lib/python3.6/site-packages/airflow/models.py", line 1611, in signal_handler
task_copy.on_kill()
File "/opt/hadoop/airflow/python/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 125, in on_kill
os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM)
ProcessLookupError: [Errno 3] No such process
[2018-12-28 09:13:30,514] INFO - Process psutil.Process(pid=11515 (terminated)) (11515) terminated with exit code 0
[2018-12-28 09:13:30,514] INFO - Process psutil.Process(pid=20649 (terminated)) (20649) terminated with exit code None
[2018-12-28 09:13:30,514] INFO - Process psutil.Process(pid=11530 (terminated)) (11530) terminated with exit code None
[2018-12-28 09:13:30,515] INFO - [2018-12-28 09:13:30,515] INFO - Task exited with return code 0
Однако этоудается посылать электронные письма (иногда), когда я закомментирую две строки подпроцессов.Есть идеи, как решить эту проблему?