GoogleCloudStorageToBigQueryOperator source_objects для получения списка через XCom - PullRequest
0 голосов
/ 30 августа 2018

Я хотел бы передать список строк, содержащих имена файлов в хранилище Google, в XCom. Позже будет подхвачен задачей GoogleCloudStorageToBigQueryOperator. Поле source_objects является шаблонным, поэтому можно использовать шаблоны Jinja. К сожалению, Джинджа может вернуть только строку, и поэтому я не могу передать список в XCom.

Как я могу использовать список XCom в GoogleCloudStorageToBigQueryOperator?

Ссылка на аналогичный вопрос, решаемый с помощью provide_context : Передача списка строк в качестве параметра зависимой задачи в Airflow

Самое близкое решение, которое я нашел, - это создание класса-обертки и отправка идентификатора задачи, опубликовавшей xcom, следующим образом:

@apply_defaults
def __init__(self, source_objects_task_id,
....
def execute(self, context):
    source_objects = context['ti']
          .xcom_pull(task_ids=self.source_objects_task_id)
    operator = GoogleCloudStorageToBigQueryOperator(
          source_objects=source_objects,
          dag=self.dag,
....
)

    operator.execute(context)

1 Ответ

0 голосов
/ 30 августа 2018

Не знаю, как получить список объектов Google Cloud Storage, но если вы делаете это с помощью GoogleCloudStorageListOperator, вы можете вместо этого передавать символы подстановки в source_objects параметры в GoogleCloudStorageToBigQueryOperator так же, как в BigQuery Web. интерфейс:

GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
    task_id='gcs_to_bq',
    bucket='test_bucket',
    source_objects=['folder1/*.csv', 'folder2/*.csv'],
    destination_project_dataset_table='dest_table',
    schema_object='gs://test-bucket/schema.json',
    source_format='CSV',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    bigquery_conn_id='bq-conn',
    google_cloud_storage_conn_id='gcp-conn',
    dag=dag
)

Если вы хотите получить список из другой задачи, используя xcom, вы можете создать новый оператор или плагин Airflow для GoogleCloudStorageToBigQueryOperator, добавив новый параметр source_objects_task_id, удалив параметр source_objects и просто заменив следующее код (строки 203 и 204: https://github.com/apache/incubator-airflow/blob/ac9033db0981ae1f770a8bdb5597055751ab15bd/airflow/contrib/operators/gcs_to_bq.py#L203-L204):

source_uris = ['gs://{}/{}'.format(self.bucket, source_object)
               for source_object in self.source_objects]

с

source_uris = ['gs://{}/{}'.format(self.bucket, source_object)
               for source_object in context['ti'].xcom_pull(task_ids=self.source_objects_task_id)]

и используйте его следующим образом:

GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
    task_id='gcs_to_bq',
    bucket='test_bucket',
    source_objects_task_id='task-id-of-previos-task',
    destination_project_dataset_table='dest_table',
    schema_object='gs://test-bucket/schema.json',
    source_format='CSV',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    bigquery_conn_id='bq-conn',
    google_cloud_storage_conn_id='gcp-conn',
    dag=dag
)
...