Почему Airflow / Composer Макросы не интерполируются / не интерпретируются? - PullRequest
0 голосов
/ 20 апреля 2020

Я не уверен, что это нормальное поведение, но я пытаюсь запустить команду, используя gsutil в python_callable в BranhPythonOperator ... Эта команда хорошо работает, когда я явно использую ее на своем терминале с жестко закодированные пути в GCS, но однажды я пытаюсь запустить его в своей группе обеспечения доступности баз данных, используя {{ds_nodash}} и {{run_id}} (макросы воздушного потока). Поток воздуха не интерпретирует их, как вы можете видеть в журналах ниже.

Composer Logs

Вот код в моем определении DAG

with DAG("DAG_NAME", default_args=default_args, schedule_interval="@hourly", catchup=False) as dag:
    # Buckets
    airflow_bucket = "XXXXX"  # Hidden on purpose
    archive_bucket = "YYYYY"  # Hidden on purpose

    # Paths
    raw_data_path = "raw_data/tc_export/raw/{{ds_nodash}}/{{run_id}}/*"
    airflow_local_dir = "/home/airflow/gcs/data/tc_data/"

    # SFTP & dirs
    sftp_key = "KEY"  # Hidden on purpose
    sftp_remote_directory_root = '/data/from_tc/'

    op_check_if_files_in_sftp = BranchPythonOperator(
        task_id='check_if_files_in_sftp',
        provide_context=True,
        python_callable=check_if_files_in_sftp,
        op_kwargs={'remote_directory_root': sftp_remote_directory_root},
        templates_dict={"sftp_key": sftp_key})
    op_check_if_files_in_bucket = BranchPythonOperator(
        task_id='check_if_files_in_bucket',
        provide_context=True,
        python_callable=check_if_files_in_bucket,
        op_kwargs={'bucket': archive_bucket, 'subdir': raw_data_path})

А вот функция, которая выполняет gsutil

def check_if_files_in_bucket(bucket: str, subdir: str, **kwargs) -> str:
    """
    Check if files already exist in the archives' bucket.

    :param bucket: bucket in which to search
    :param subdir: directory within the bucket
    :param kwargs: additional context parameters.
    :return: id of the next DAG operator
    """
    try:
        logging.info(f"Executing command : gsutil -q  stat gs://{bucket}/{subdir}")
        command = subprocess.run(["gsutil", "-q", "stat", f"gs://{bucket}/{subdir}"])
        if command.returncode:
            logging.info(f"Command return code : {command.returncode}. Ending process.")
            return "end_process"
        logging.info(f"There are files within the {bucket}/{subdir}. Proceeding with the next step.")
        return "transfer_to_other_bucket"
    except OSError as os_err:
        logging.exception(os_err)
        exit(1)
    except ValueError as val_err:
        logging.exception(val_err)
        exit(1)

Итак, мои вопросы:

  1. Когда Airflow интерпретирует макросы?
  2. Как это исправить?

1 Ответ

1 голос
/ 27 апреля 2020

Проблема здесь связана с неиспользованием аргумента templates_dict в BranchPythonOperator. Вот исправленный код:

op_check_if_files_in_bucket = BranchPythonOperator(task_id='check_if_files_in_bucket',
                                             provide_context=True,
                                             python_callable=check_if_files_in_bucket,
                                             op_kwargs={'bucket': archive_bucket},
                                             templates_dict={'subdir': raw_data_path})

И функция python_callable:

def check_if_files_in_bucket(bucket: str, **kwargs) -> None:
    """
    Check if files already exist in the archives' bucket.

    :param bucket: bucket in which to search
    :param kwargs: additional context parameters, and subdirectory in bucket.
    :return: None
    """
    try:
        subdir = kwargs["templates_dict"]["subdir"]
        cmd_check_files = ["gsutil", "-q", "stat", f"gs://{bucket}/{subdir}"]

        logging.info(f"Executing command : {' '.join(cmd_check_files)}")
        command = subprocess.run(cmd_check_files)
        if command.returncode:
            logging.info(f"Command return code : {command.returncode}. Ending process.")
            return "end_process"
        logging.info(f"There are files within the {bucket}/{subdir}. Proceeding with the next step.")
        return "transfer_to_other_bucket"
    except OSError as os_err:
        logging.exception(os_err)
        exit(1)
    except ValueError as val_err:
        logging.exception(val_err)
        exit(1)

Примечание: поскольку BranchPythonOperator расширяет PythonOperator, применяется то же правило.

...