Не знаю, как получить список объектов Google Cloud Storage, но если вы делаете это с помощью GoogleCloudStorageListOperator
, вы можете вместо этого передавать символы подстановки в source_objects
параметры в GoogleCloudStorageToBigQueryOperator
так же, как в BigQuery Web. интерфейс:
GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq',
bucket='test_bucket',
source_objects=['folder1/*.csv', 'folder2/*.csv'],
destination_project_dataset_table='dest_table',
schema_object='gs://test-bucket/schema.json',
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='bq-conn',
google_cloud_storage_conn_id='gcp-conn',
dag=dag
)
Если вы хотите получить список из другой задачи, используя xcom
, вы можете создать новый оператор или плагин Airflow для GoogleCloudStorageToBigQueryOperator
, добавив новый параметр source_objects_task_id
, удалив параметр source_objects
и просто заменив следующее код (строки 203 и 204: https://github.com/apache/incubator-airflow/blob/ac9033db0981ae1f770a8bdb5597055751ab15bd/airflow/contrib/operators/gcs_to_bq.py#L203-L204):
source_uris = ['gs://{}/{}'.format(self.bucket, source_object)
for source_object in self.source_objects]
с
source_uris = ['gs://{}/{}'.format(self.bucket, source_object)
for source_object in context['ti'].xcom_pull(task_ids=self.source_objects_task_id)]
и используйте его следующим образом:
GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq',
bucket='test_bucket',
source_objects_task_id='task-id-of-previos-task',
destination_project_dataset_table='dest_table',
schema_object='gs://test-bucket/schema.json',
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='bq-conn',
google_cloud_storage_conn_id='gcp-conn',
dag=dag
)