оператор bash внутри composer для копирования самых последних файлов из одного сегмента GCS в другой - PullRequest
0 голосов
/ 02 января 2019

Я написал следующий скрипт DAG (код Python), который запускает BashOperator. Это странно, потому что команда bash работает гладко, когда я запускаю команду bash в свой терминал. Но это та же команда, когда я оборачиваю ее в BashOperatgor в DAG Airflow.

Цель этого кода - скопировать последние файлы (на сегодняшний день) из одного сегмента GCS в другой. Ниже приведен код:

из airflow.operators.bash_operator import BashOperator от воздушного потока импорт DAG с даты и времени импорта дата и время

DEFAULT_DAG_ARGS = {
    'owner': 'name',
    'depends_on_past': False,
    'start_date': datetime.now(),
    'retries': 0,
    'schedule_interval': None
}

with DAG('copy_input_files', default_args=DEFAULT_DAG_ARGS) as dag:
    pre_dag_cp = BashOperator(
        task_id='copy_current_files',
        bash_command="gsutil -m ls -l gs://input/files/UES | grep $(date -I) | sed 's/.*\(gs:\/\/\)/\1/'| gsutil cp -I  gs://output/recent_files "  + "\nexit 0"
    )

Я получаю следующую ошибку: CommandException: No URLs matched: input/files/UES/TV11_INFODEB.2019_01_02_02.orc и никакие файлы не копируются, как ожидалось, когда я тестирую команду bash за пределами dag в базовом терминале, это работает. Любая идея, как это исправить, пожалуйста,

1 Ответ

0 голосов
/ 02 января 2019

Пожалуйста, обратитесь к выделенному оператору для этой задачи.

from airflow.contrib.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator

            copy_single_file = GoogleCloudStorageToGoogleCloudStorageOperator(
                task_id='copy_single_file',
                source_bucket='data',
                source_object='sales/sales-2017/january.avro',
                destination_bucket='data_backup',
                destination_object='copied_sales/2017/january-backup.avro',
                google_cloud_storage_conn_id=google_cloud_conn_id
            )

Ссылка: https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/gcs_to_gcs.py

...