Apache Spark записывает на несколько выходов [разные схемы паркета] без кэширования - PullRequest
0 голосов
/ 01 марта 2019

Я хочу преобразовать свои входные данные (файлы XML) и произвести 3 разных вывода.

Каждый вывод будет в формате паркета и будет иметь различную схему / количество столбцов.

В настоящее время в моем решении данные хранятся в RDD[Row], где каждая строка принадлежит одномутрех типов и имеет разное количество полей.Сейчас я кеширую RDD, затем фильтрую его (используя поле, сообщающее мне о типе записи) и сохраняю данные, используя следующий метод:

var resultDF_1 = sqlContext.createDataFrame(filtered_data_1, schema_1)
resultDF_1.write.parquet(output_path_1)
...
// the same for filtered_data_2 and filtered_data_3

Есть ли способчтобы сделать это лучше, например, не кэшировать все данные в памяти?

В MapReduce у нас есть класс MultipleOutputs, и мы можем сделать это следующим образом:

MultipleOutputs.addNamedOutput(job, "data_type_1", DataType1OutputFormat.class, Void.class, Group.class);
MultipleOutputs.addNamedOutput(job, "data_type_2", DataType2OutputFormat.class, Void.class, Group.class);
MultipleOutputs.addNamedOutput(job, "data_type_3", DataType3OutputFormat.class, Void.class, Group.class);
...
MultipleOutputs<Void, Group> mos = new MultipleOutputs<>(context);
mos.write("data_type_1", null, myRecordGroup1, filePath1);
mos.write("data_type_2", null, myRecordGroup2, filePath2);
...

1 Ответ

0 голосов
/ 01 марта 2019

AFAIK, нет способа разделить один RDD на несколько RDD как таковых.Именно так работает DAG Spark: только дочерние RDD, извлекающие данные из родительских RDD.

Однако мы можем иметь несколько дочерних RDD, читающих из одного и того же родительского RDD.Чтобы избежать повторного вычисления родительского RDD, нет другого способа, кроме как кэшировать его. Я предполагаю, что вы хотите избежать кэширования, потому что боитесь нехватки памяти. Мы можем избежать проблемы нехватки памяти (OOM), сохранив СДР на MEMORY_AND_DISK, так что большой СДД попадет на диск, еслии при необходимости.

Давайте начнем с ваших исходных данных:

val allDataRDD = sc.parallelize(Seq(Row(1,1,1),Row(2,2,2),Row(3,3,3)))

Сначала мы можем сохранить это в памяти, но в случае ее нехватки выпустить на диск:

allDataRDD.persist(StorageLevel.MEMORY_AND_DISK)

Затем мы создаем 3 выхода RDD:

filtered_data_1 = allDataRDD.filter(_.get(1)==1) // //
filtered_data_2 = allDataRDD.filter(_.get(2)==1) // use your own filter funcs here
filtered_data_3 = allDataRDD.filter(_.get(3)==1) // //

Затем запишем выходные данные:

var resultDF_1 = sqlContext.createDataFrame(filtered_data_1, schema_1)
resultDF_1.write.parquet(output_path_1)
var resultDF_2 = sqlContext.createDataFrame(filtered_data_2, schema_2)
resultDF_2.write.parquet(output_path_2)
var resultDF_3 = sqlContext.createDataFrame(filtered_data_3, schema_3)
resultDF_3.write.parquet(output_path_3)

Если вы действительно действительно хотите избежать несколькихпроходит, есть обходной путь, используя пользовательский разделитель.Вы можете перераспределить ваши данные на 3 раздела, и у каждого раздела будет своя собственная задача и, следовательно, собственный выходной файл / часть.Предостережение заключается в том, что параллелизм будет сильно сокращен до 3 потоков / задач, а также существует риск> 2 ГБ данных, хранящихся в одном разделе (Spark имеет ограничение 2 ГБ на раздел).Я не предоставляю подробный код для этого метода, потому что я не думаю, что он может писать файлы паркета с другой схемой.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...