Задание потока воздуха запускается при каждой повторной попытке - PullRequest
0 голосов
/ 13 января 2020

Я использую Airflow для запуска задачи DockerOperator раз в день. Поскольку аргумент auto_remove устарел, я использую BashOperator с командой docker rm, чтобы очистить покинутый контейнер. Эта команда выполняется как последующая задача, которая должна выполняться после задачи оператора docker. Я искал способ всегда выполнять последующую задачу, даже если родитель не удался. Я обнаружил, что аргумент trigger_rule был тем, что я искал с опцией all_done, но если контейнер останавливается с кодом ошибки и задача повторяется, то последующая задача не выполняется, контейнер остается и следующая попытка не удалась сразу, потому что имя контейнера уже существует. Есть ли способ запускать последующую задачу при каждой повторной попытке?

Примечание: я не хочу удалять имя контейнера, поскольку я не хочу, чтобы контейнеры накапливались.

with DAG(DAG_ID, default_args=default_args, schedule_interval=SCHEDULE_INTERVAL, catchup=CATCHUP) as dag:

    docker_launch_container = DockerOperator(
        task_id='docker_launch_container',
        image='my_custom_image',
        api_version='auto',
        container_name='my_custom_container',
    )

    docker_remove_container = BashOperator(
        task_id='docker_remove_container',
        bash_command="docker rm $(docker ps -a | grep my_custom_container | grep Exited | awk '{print $1}')",
        trigger_rule='all_done'
    )

    docker_launch_container >> docker_remove_container
...