Я написал следующий скрипт DAG (код Python), который запускает BashOperator. Это странно, потому что команда bash работает гладко, когда я запускаю команду bash в свой терминал. Но это та же команда, когда я оборачиваю ее в BashOperatgor в DAG Airflow.
Цель этого кода - скопировать последние файлы (на сегодняшний день) из одного сегмента GCS в другой. Ниже приведен код:
из airflow.operators.bash_operator import BashOperator
от воздушного потока импорт DAG
с даты и времени импорта дата и время
DEFAULT_DAG_ARGS = {
'owner': 'name',
'depends_on_past': False,
'start_date': datetime.now(),
'retries': 0,
'schedule_interval': None
}
with DAG('copy_input_files', default_args=DEFAULT_DAG_ARGS) as dag:
pre_dag_cp = BashOperator(
task_id='copy_current_files',
bash_command="gsutil -m ls -l gs://input/files/UES | grep $(date -I) | sed 's/.*\(gs:\/\/\)/\1/'| gsutil cp -I gs://output/recent_files " + "\nexit 0"
)
Я получаю следующую ошибку: CommandException: No URLs matched: input/files/UES/TV11_INFODEB.2019_01_02_02.orc
и никакие файлы не копируются, как ожидалось, когда я тестирую команду bash за пределами dag в базовом терминале, это работает. Любая идея, как это исправить, пожалуйста,