Выполнять сценарии Windows Python через контейнер док-станции воздушного потока - PullRequest
0 голосов
/ 27 ноября 2018

Я работаю на Windows, с настройкой Airflow через Docker.У меня есть несколько скриптов Python в окнах, которые читают и пишут из разных мест в окнах (SSH-соединения, папки Windows и т. Д.).Будет много работы по репликации всех этих входных данных в моем образе докера, и поэтому я хочу, чтобы Airflow выполнял эти сценарии, как будто они работают в Windows.

Возможно ли это, если да, то как?

Вот сценарий, который я запускаю как мой DAG:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

# Following are defaults which can be overridden later on
default_args = {
    'owner': 'test',
    'depends_on_past': False,
    'start_date': datetime(2018, 11, 27),
    'email': ['test@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG('Helloworld', default_args=default_args)

###########################################################
# Here's where I want to execute my windows python script #
###########################################################
t1=PythonOperator(dag=dag,
               task_id='my_task_powered_by_python',
               provide_context=False,
               python_callable=r"C:\Users\user\Documents\script.py")

t2 = BashOperator(
    task_id='task_2',
    bash_command='echo "Hello World from Task 2"',
    dag=dag)

t3 = BashOperator(
    task_id='task_3',
    bash_command='echo "Hello World from Task 3"',
    dag=dag)

t4 = BashOperator(
    task_id='task_4',
    bash_command='echo "Hello World from Task 4"',
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream(t2)
t4.set_upstream(t3)

1 Ответ

0 голосов
/ 27 ноября 2018

У вас есть путь к скрипту Python, переданный в PythonOperator, PythonOperator ищет объект кода Python, а не путь к файлу скрипта.

У вас есть два варианта вызова этих сценариев Python.

Вызов сценария напрямую через Bash и BashOperator

Вы можете использовать BashOperator каквыше, чтобы вызвать скрипт Python напрямую.

Вы можете выполнить вызов скрипта Python точно так же, как если бы вы не использовали Airflow, используя следующую команду в вашем BashOperator

python script.py

Переместить сценарий и использовать PythonOperator

Если эти сценарии вызываются только из Airflow, я бы рассмотрел их перемещение в базу кода Python и вызов любой функции точки входа, которая у вас есть по мере необходимости.

Ex

airflowHome/
  dags/
  plugins/
  scripts/
    __init__.py
    script1.py
    script2.py

Теперь вы сможете получить доступ к своим сценариям в модуле сценариев с помощью импорта Python.Оттуда вы можете вызывать определенную функцию из вашей DAG, используя PythonOperator.

...