Я использую конвейер Beam на SparkRunner с выходными данными файла Parquet (хотя проблема существует, если я выполняю другие выходные операции ввода-вывода). У меня проблема в том, что при выводе копия файла перезаписывает свой собственный вывод. Вот вывод журнала:
19/10/22 18:26:35 INFO FileBasedSink: Will copy temporary file FileResult{tempFilename=/home/hadoop/just1hour/.temp-beam-357d6916-5d8e-4519-a7a4-3852249011b5/77100cd1-04ae-441c-848f-e0d0067feeb8, shard=0, window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@5316e95f,
paneInfo=PaneInfo.NO_FIRING} to final location /home/hadoop/just1hour/output-00000-of-00001
19/10/22 18:26:35 INFO FileBasedSink: Will copy temporary file FileResult{tempFilename=/home/hadoop/just1hour/.temp-beam-357d6916-5d8e-4519-a7a4-3852249011b5/f19819df-e006-431a-8ccd-6e67af692c3e, shard=0, window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@5316e95f,
paneInfo=PaneInfo.NO_FIRING} to final location /home/hadoop/just1hour/output-00000-of-00001
19/10/22 18:26:35 INFO FileBasedSink: Will copy temporary file FileResult{tempFilename=/home/hadoop/just1hour/.temp-beam-357d6916-5d8e-4519-a7a4-3852249011b5/cb2abe0c-8cc2-4a94-ae54-97b67c5e7d20, shard=0, window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@5316e95f,
paneInfo=PaneInfo.NO_FIRING} to final location /home/hadoop/just1hour/output-00000-of-00002
19/10/22 18:26:35 INFO FileBasedSink: Will copy temporary file FileResult{tempFilename=/home/hadoop/just1hour/.temp-beam-357d6916-5d8e-4519-a7a4-3852249011b5/611d194a-4e8f-44bc-8776-4bc2c55a8f34, shard=1, window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@5316e95f,
paneInfo=PaneInfo.NO_FIRING} to final location /home/hadoop/just1hour/output-00001-of-00002
Как видите, первый файл перезаписывается.
Мне удалось обойти это, вручную указав количество сегментов, равное количеству входных файлов, но мне интересно, есть ли другие конфигурации, которые могли бы объяснить или предотвратить это поведение.
Редактировать:
Это пакетное задание, и вот код, который генерирует вывод:
p.apply(TextIO.read().from(input).withDelimiter("{".getBytes()))
.apply(Filter.by((String record) -> !record.isEmpty()))
.apply(ParDo.of(new ParseNotificationJSON())).setCoder(AvroCoder.of(SCHEMA))
.apply("Write Parquet files",
FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to(output));
p.run().waitUntilFinish();