Я пытался создать DAG, которая состоит из двух PythonOperators. Один из которых запускает простой bash сценарий, а другой просто выводит выходные данные в журналы. Я использовал этот метод для запуска некоторых сложных сценариев bash с использованием оператора python в течение достаточно долгого времени, и они работали нормально, пока я не обновил воздушный поток с 1.10.6 до 1.10.7. Теперь PythonOperator продолжает работать бесконечно, в то же время сообщая, что последний тактовый сигнал из планировщика был получен 10 минут go, хотя первая функция работала нормально и планировщик все время находился в рабочем состоянии.
Я пытался оставив задачу на некоторое время запущенной, в надежде, что она будет выполнена, но в конечном итоге задание не выполнено, показывая, что поток воздуха должен был выполнить SIGKILL задачу, поскольку она не отвечала. Ниже приведен код самого простого DAG, который я пробовал до сих пор:
import airflow
import os
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
'provide_context': True,
}
dag = DAG('Test_Job', schedule_interval="@once", default_args=args)
def a(**kwargs):
command_to_execute = "scalac --version"
stream = os.popen(command_to_execute)
output = stream.read()
print("Scalac done")
print(output)
def b(**kwargs):
print("Hi")
A = PythonOperator(task_id='A', python_callable=a, dag = dag)
B = PythonOperator(task_id='B', python_callable=b, dag = dag)
A.set_upstream(B)
if __name__ == ("__main__"):
b()
a()
Я вернулся к 1.10.6, когда эта проблема сохранилась, и теперь она работает, как и ожидалось. Означает ли это, что этот метод несовместим с потоком воздуха 1.10.7? Если да, то как это может быть достигнуто в 1.10.7?
PS: я знаю, что в идеале BashOperator должен использоваться для запуска скрипта bash, но из-за некоторых требований к продукту я должен это сделать используя PythonOperator.