У меня есть простой apache лучевой конвейер, который читает bigquery >> создает файл паркета в GCS >> Из пути GCS я читаю информацию метаданных и получаю объект JSON с пользовательской функцией ParDo >> I написать как текст обратно в GCS.
Нет ошибки во время выполнения, но, когда я проверяю файл, созданный в целевом пути GCS, он имеет 0 байт . Я проверил, пуста ли Pcollection с помощью функции beam.ParDo (print) . Когда я делаю это, он печатает объект JSON. Я уже пытался преобразовать объект как str, list и dict. Все хорошо работает с оператором beam.ParDo (print) , но не может записать какие-либо данные в GCS. Пожалуйста, дайте мне знать, в чем моя ошибка? Заранее спасибо:)
class CreateMetadata(beam.DoFn):
def process(self, element, *args, **kwargs):
# element here gets a GCS path as gs://.....
### Some internal process ###
# This function is called only once at the end of the internal
# process have a JSON object as entity data
yield entity_data
with beam.Pipeline(options=pipeline_options) as pipeline:
table = (
pipeline | 'ReadTableFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=bigquery.read_sql(query_string=query),
use_standard_sql=True))
| 'writeAsParquetToGCS' >> pq.WriteToParquet(OUTPUT, schema=bigquery.get_parquet_schema_from_bq(),
num_shards=1)
| 'CreateMetadata' >> beam.ParDo(CreateMetadata())
# | beam.ParDo(print)
| 'WriteToGCS' >> beam.io.WriteToText(file_path_prefix=OUTPUT.replace('Some GCS path'), file_name_suffix='.json',num_shards=1)
# Now after the pipeline runs when I check the GCS path there is a file created with
# 0 bytes???
)```