Объединение двух общих записей в одну и запись в файл паркета - PullRequest
1 голос
/ 14 октября 2019

В общем, идея состоит в том, чтобы прочитать файл паркета (который возвращает PCollection), сделать новую PCollection в зависимости от файла, который мы прочитали, объединить эти два PCollections в один и записать его в файл паркета в другом месте.

Я пытаюсь этот код на SparkRunner.

Я не могу опубликовать полный код, но я попытаюсь указать логику.

Это фрагмент кода:

PCollection<GenericRecord> oldGen = pipeline.apply(ParquetIO.read(SCHEMA).from(/path);
PCollection<GenericRecord> newGen = processNew(); //in this part I am making new PCollection
PCollectionList<GenericRecord> pList = PCollectionList.of(oldGen).and(newGen);

pList
     .apply(Flatten.pCollections())
     .setCoder(AvroCoder.of(GenericRecord.class, SCHEMA))
     .apply(FileIO.<GenericRecord>write()
         .via(ParquetIO.sink(SCHEMA)).to(/outputlocation));

Когда я проверяю выходное местоположение, я получаю только одну PC-коллекцию из тех двух, которые должны быть сведены (без правил, какой из них). Я попытался сделать несколько отпечатков после сглаживания коллекций, и печать выглядит нормально, также я попытался записать эти данные в формат .txt, и это тоже отлично работает.

Кроме того, я попробовал один глупыйРешение, и, к моему удивлению, оно прошло как-то. Параметр oldGen имеет один столбец с логическим флагом. Я преобразовал эту коллекцию в две коллекции: одна с флагом true, а другая с флагом false.

Фрагмент для PCollectionList выглядит следующим образом:

PCollectionList<GenericRecord> pList = PCollectionList.of(newGen).and(trueGen).and(falseGen);

Запись в паркет выглядит так же, как в предыдущем фрагменте, но с этим фрагментом я получаю хороший файл паркета со всеми необходимыми записями.

Это выглядит действительно странно, потому что в обеих ситуациях я получаю одну и ту же коллекцию PC, когда выкладываю эти PCollections, и еще одна вещь, когда я запускаю ее на DirectRunner, все работает без проблем.

Версия Beam - 2.14.0, а версия Spark - 2.2.3.

У кого-нибудь есть идеи, почему это происходит в первом случае?

...