Я использую Airflow для запуска задачи DockerOperator
раз в день. Поскольку аргумент auto_remove
устарел, я использую BashOperator
с командой docker rm
, чтобы очистить покинутый контейнер. Эта команда выполняется как последующая задача, которая должна выполняться после задачи оператора docker. Я искал способ всегда выполнять последующую задачу, даже если родитель не удался. Я обнаружил, что аргумент trigger_rule был тем, что я искал с опцией all_done
, но если контейнер останавливается с кодом ошибки и задача повторяется, то последующая задача не выполняется, контейнер остается и следующая попытка не удалась сразу, потому что имя контейнера уже существует. Есть ли способ запускать последующую задачу при каждой повторной попытке?
Примечание: я не хочу удалять имя контейнера, поскольку я не хочу, чтобы контейнеры накапливались.
with DAG(DAG_ID, default_args=default_args, schedule_interval=SCHEDULE_INTERVAL, catchup=CATCHUP) as dag:
docker_launch_container = DockerOperator(
task_id='docker_launch_container',
image='my_custom_image',
api_version='auto',
container_name='my_custom_container',
)
docker_remove_container = BashOperator(
task_id='docker_remove_container',
bash_command="docker rm $(docker ps -a | grep my_custom_container | grep Exited | awk '{print $1}')",
trigger_rule='all_done'
)
docker_launch_container >> docker_remove_container