В общем, идея состоит в том, чтобы прочитать файл паркета (который возвращает PCollection), сделать новую PCollection в зависимости от файла, который мы прочитали, объединить эти два PCollections в один и записать его в файл паркета в другом месте.
Я пытаюсь этот код на SparkRunner.
Я не могу опубликовать полный код, но я попытаюсь указать логику.
Это фрагмент кода:
PCollection<GenericRecord> oldGen = pipeline.apply(ParquetIO.read(SCHEMA).from(/path);
PCollection<GenericRecord> newGen = processNew(); //in this part I am making new PCollection
PCollectionList<GenericRecord> pList = PCollectionList.of(oldGen).and(newGen);
pList
.apply(Flatten.pCollections())
.setCoder(AvroCoder.of(GenericRecord.class, SCHEMA))
.apply(FileIO.<GenericRecord>write()
.via(ParquetIO.sink(SCHEMA)).to(/outputlocation));
Когда я проверяю выходное местоположение, я получаю только одну PC-коллекцию из тех двух, которые должны быть сведены (без правил, какой из них). Я попытался сделать несколько отпечатков после сглаживания коллекций, и печать выглядит нормально, также я попытался записать эти данные в формат .txt, и это тоже отлично работает.
Кроме того, я попробовал один глупыйРешение, и, к моему удивлению, оно прошло как-то. Параметр oldGen имеет один столбец с логическим флагом. Я преобразовал эту коллекцию в две коллекции: одна с флагом true, а другая с флагом false.
Фрагмент для PCollectionList выглядит следующим образом:
PCollectionList<GenericRecord> pList = PCollectionList.of(newGen).and(trueGen).and(falseGen);
Запись в паркет выглядит так же, как в предыдущем фрагменте, но с этим фрагментом я получаю хороший файл паркета со всеми необходимыми записями.
Это выглядит действительно странно, потому что в обеих ситуациях я получаю одну и ту же коллекцию PC, когда выкладываю эти PCollections, и еще одна вещь, когда я запускаю ее на DirectRunner, все работает без проблем.
Версия Beam - 2.14.0, а версия Spark - 2.2.3.
У кого-нибудь есть идеи, почему это происходит в первом случае?