Beam на SparkRunner перезаписывает свой собственный вывод - PullRequest
1 голос
/ 22 октября 2019

Я использую конвейер Beam на SparkRunner с выходными данными файла Parquet (хотя проблема существует, если я выполняю другие выходные операции ввода-вывода). У меня проблема в том, что при выводе копия файла перезаписывает свой собственный вывод. Вот вывод журнала:

19/10/22 18:26:35 INFO FileBasedSink: Will copy temporary file FileResult{tempFilename=/home/hadoop/just1hour/.temp-beam-357d6916-5d8e-4519-a7a4-3852249011b5/77100cd1-04ae-441c-848f-e0d0067feeb8, shard=0, window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@5316e95f,
 paneInfo=PaneInfo.NO_FIRING} to final location /home/hadoop/just1hour/output-00000-of-00001
19/10/22 18:26:35 INFO FileBasedSink: Will copy temporary file FileResult{tempFilename=/home/hadoop/just1hour/.temp-beam-357d6916-5d8e-4519-a7a4-3852249011b5/f19819df-e006-431a-8ccd-6e67af692c3e, shard=0, window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@5316e95f,
 paneInfo=PaneInfo.NO_FIRING} to final location /home/hadoop/just1hour/output-00000-of-00001
19/10/22 18:26:35 INFO FileBasedSink: Will copy temporary file FileResult{tempFilename=/home/hadoop/just1hour/.temp-beam-357d6916-5d8e-4519-a7a4-3852249011b5/cb2abe0c-8cc2-4a94-ae54-97b67c5e7d20, shard=0, window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@5316e95f,
 paneInfo=PaneInfo.NO_FIRING} to final location /home/hadoop/just1hour/output-00000-of-00002
19/10/22 18:26:35 INFO FileBasedSink: Will copy temporary file FileResult{tempFilename=/home/hadoop/just1hour/.temp-beam-357d6916-5d8e-4519-a7a4-3852249011b5/611d194a-4e8f-44bc-8776-4bc2c55a8f34, shard=1, window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@5316e95f,
 paneInfo=PaneInfo.NO_FIRING} to final location /home/hadoop/just1hour/output-00001-of-00002

Как видите, первый файл перезаписывается.

Мне удалось обойти это, вручную указав количество сегментов, равное количеству входных файлов, но мне интересно, есть ли другие конфигурации, которые могли бы объяснить или предотвратить это поведение.

Редактировать:

Это пакетное задание, и вот код, который генерирует вывод:

p.apply(TextIO.read().from(input).withDelimiter("{".getBytes()))
                .apply(Filter.by((String record) -> !record.isEmpty()))
                .apply(ParDo.of(new ParseNotificationJSON())).setCoder(AvroCoder.of(SCHEMA))
                .apply("Write Parquet files",
                        FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to(output));

        p.run().waitUntilFinish();

1 Ответ

0 голосов
/ 29 октября 2019

Приемники на основе FileIO в Beam перезаписывают файлы, если какой-либо файл имеет такое же имя в целевом каталоге. В именах файлов по умолчанию для ограниченных источников также используются shard-Index и shard-number в именах файлов, поэтому при использовании .withNumShards(0) будет использоваться шардинг, определенный бегуном. Если вы используете .withNumShards(0) в своей раковине, она должна работать нормально.

...