Apache Airflow: Как запустить Docker Оператор с переменными окружения из другой задачи? - PullRequest
0 голосов
/ 06 мая 2020

Я хочу запустить оператор Docker в Airflow с переменной окружения download_path , которая устанавливается в предыдущей задаче. Как я могу этого добиться? Через Xcom? Минимальный пример:

# define python function
def make_folder(folder_path:str, date:str):
    download_path= folder_path + date 
    return(download_path)
    os.mkdir(download_path)

# Python operator
task_1 = PythonOperator(
    task_id="make_folder",
    provide_context=False,
    python_callable=make_folder,
    op_kwargs={'folder_path': '/my_path_', 'date': 'str(datetime.date(datetime.today()))'},
    xcom_push=True,
    xcom_all=True
)

# docker operator with download_path as a necessary env variable
task_2 = DockerOperator(
    task_id='docker',
    image='file_processor:latest',
    api_version='auto',
    auto_remove=False,
    command='',
    environment={
        'DPATH': **download_path**
    },
    docker_url="unix://var/run/docker.sock",
    network_mode="bridge",
    xcom_push=True,
    xcom_all=True
)

task_1 >> task_2

Ответы [ 2 ]

1 голос
/ 06 мая 2020

Да, вам нужно использовать Xcom. Попробуйте следующее:

task_2 = DockerOperator(
                task_id='docker',
                image='file_processor:latest',
                api_version='auto',
                auto_remove=False,
                command='',
                environment={
                        'DPATH': '{{ti.xcom_pull(task_ids='make_folder') }}'
                },
                docker_url="unix://var/run/docker.sock",
                network_mode="bridge",
                xcom_push=True,
                xcom_all=True
)

Если вышеуказанное не работает, передайте оператору идентификатор предыдущей задачи (make_folder) и попробуйте извлечь оттуда:

context['ti'].xcom_pull(prev_task_id)
0 голосов
/ 06 мая 2020

Вы бы действительно использовали xcom. Для этой цели объект task_instance предоставляет два нескольких метода: xcom_push и xcom_pull.

Чтобы pu sh значение xcom, вам необходимо предоставить контекст для вашего "python сопоставимая "функция. Это сделает объект task_instance доступным для функции.

Поле environment для DockerOperator является шаблоном. Это означает, что он поддерживает макросы . Вы можете использовать макрос {{ task_instance }}.

def make_folder(folder_path:str, date:str, **context):
    download_path= folder_path + date 
    os.mkdir(download_path)
    task_instance = context['task_instance']
    task_instance.xcom_push(key="download_path", value=download_path)


task_1 = PythonOperator(
    task_id="make_folder",
    provide_context=True,
    python_callable=make_folder,
    op_kwargs={'folder_path': '/my_path_', 'date':'str(datetime.date(datetime.today()))'}
)


task_2 = DockerOperator(
                task_id='docker',
                image='file_processor:latest',
                api_version='auto',
                auto_remove=False,
                command='',
                environment={
                  'DPATH': "{{ task_instance.xcom_pull(task_ids='make_folder', key='download_path') }}"
                },
                docker_url="unix://var/run/docker.sock",
                network_mode="bridge"
)

task_1 >> task_2
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...