Выполнение скрипта bash в PythonOperator работает в airflow 1.10.6, а не в 1.10.7 - PullRequest
1 голос
/ 30 января 2020

Я пытался создать DAG, которая состоит из двух PythonOperators. Один из которых запускает простой bash сценарий, а другой просто выводит выходные данные в журналы. Я использовал этот метод для запуска некоторых сложных сценариев bash с использованием оператора python в течение достаточно долгого времени, и они работали нормально, пока я не обновил воздушный поток с 1.10.6 до 1.10.7. Теперь PythonOperator продолжает работать бесконечно, в то же время сообщая, что последний тактовый сигнал из планировщика был получен 10 минут go, хотя первая функция работала нормально и планировщик все время находился в рабочем состоянии.

Я пытался оставив задачу на некоторое время запущенной, в надежде, что она будет выполнена, но в конечном итоге задание не выполнено, показывая, что поток воздуха должен был выполнить SIGKILL задачу, поскольку она не отвечала. Ниже приведен код самого простого DAG, который я пробовал до сих пор:

import airflow
import os
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True,
}

dag = DAG('Test_Job', schedule_interval="@once", default_args=args)

def a(**kwargs):
    command_to_execute = "scalac --version"
    stream = os.popen(command_to_execute)
    output = stream.read()
    print("Scalac done")
    print(output)

def b(**kwargs):
    print("Hi")



A = PythonOperator(task_id='A', python_callable=a, dag = dag)
B = PythonOperator(task_id='B', python_callable=b, dag = dag)

A.set_upstream(B)

if __name__ == ("__main__"):
    b()
    a()

Я вернулся к 1.10.6, когда эта проблема сохранилась, и теперь она работает, как и ожидалось. Означает ли это, что этот метод несовместим с потоком воздуха 1.10.7? Если да, то как это может быть достигнуто в 1.10.7?

PS: я знаю, что в идеале BashOperator должен использоваться для запуска скрипта bash, но из-за некоторых требований к продукту я должен это сделать используя PythonOperator.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...