Я не уверен, что это нормальное поведение, но я пытаюсь запустить команду, используя gsutil
в python_callable
в BranhPythonOperator
... Эта команда хорошо работает, когда я явно использую ее на своем терминале с жестко закодированные пути в GCS
, но однажды я пытаюсь запустить его в своей группе обеспечения доступности баз данных, используя {{ds_nodash}}
и {{run_id}}
(макросы воздушного потока). Поток воздуха не интерпретирует их, как вы можете видеть в журналах ниже.
![Composer Logs](https://i.stack.imgur.com/xYJ3u.png)
Вот код в моем определении DAG
with DAG("DAG_NAME", default_args=default_args, schedule_interval="@hourly", catchup=False) as dag:
# Buckets
airflow_bucket = "XXXXX" # Hidden on purpose
archive_bucket = "YYYYY" # Hidden on purpose
# Paths
raw_data_path = "raw_data/tc_export/raw/{{ds_nodash}}/{{run_id}}/*"
airflow_local_dir = "/home/airflow/gcs/data/tc_data/"
# SFTP & dirs
sftp_key = "KEY" # Hidden on purpose
sftp_remote_directory_root = '/data/from_tc/'
op_check_if_files_in_sftp = BranchPythonOperator(
task_id='check_if_files_in_sftp',
provide_context=True,
python_callable=check_if_files_in_sftp,
op_kwargs={'remote_directory_root': sftp_remote_directory_root},
templates_dict={"sftp_key": sftp_key})
op_check_if_files_in_bucket = BranchPythonOperator(
task_id='check_if_files_in_bucket',
provide_context=True,
python_callable=check_if_files_in_bucket,
op_kwargs={'bucket': archive_bucket, 'subdir': raw_data_path})
А вот функция, которая выполняет gsutil
def check_if_files_in_bucket(bucket: str, subdir: str, **kwargs) -> str:
"""
Check if files already exist in the archives' bucket.
:param bucket: bucket in which to search
:param subdir: directory within the bucket
:param kwargs: additional context parameters.
:return: id of the next DAG operator
"""
try:
logging.info(f"Executing command : gsutil -q stat gs://{bucket}/{subdir}")
command = subprocess.run(["gsutil", "-q", "stat", f"gs://{bucket}/{subdir}"])
if command.returncode:
logging.info(f"Command return code : {command.returncode}. Ending process.")
return "end_process"
logging.info(f"There are files within the {bucket}/{subdir}. Proceeding with the next step.")
return "transfer_to_other_bucket"
except OSError as os_err:
logging.exception(os_err)
exit(1)
except ValueError as val_err:
logging.exception(val_err)
exit(1)
Итак, мои вопросы:
- Когда Airflow интерпретирует макросы?
- Как это исправить?