Используйте RDD.foreach для создания Dataframe и выполнения действий над Dataframe в Spark Scala - PullRequest
0 голосов
/ 20 мая 2019

Я пытаюсь прочитать конфигурационный файл в spark read.textfile, который в основном содержит мой список таблиц. Моя задача - перебрать список таблиц и конвертировать Avro в формат ORC. пожалуйста, найдите мой фрагмент кода ниже, который сделает логику.

val tableList = spark.read.textFile('tables.txt')
tableList.collect().foreach(tblName => {
val df = spark.read.format("avro").load(inputPath+ "/" + tblName)
df.write.format("orc").mode("overwrite").save(outputPath+"/"+tblName)})

Пожалуйста, найдите мои конфигурации ниже

DriverMemory: 4 ГБ

ExecutorMemory: 10 ГБ

NoOfExecutors: 5

Размер входных данных: 45 ГБ

Мой вопрос здесь, это будет выполняться в Executor или Driver? Это выбросит Ошибка памяти? Пожалуйста, прокомментируйте ваши предложения.

val tableList = spark.read.textFile('tables.txt')

tableList.collect().foreach(tblName => {

val df = spark.read.format("avro").load(inputPath+ "/" + tblName)

df.write.format("orc").mode("overwrite").save(outputPath+"/"+tblName)}

)

Ответы [ 2 ]

0 голосов
/ 21 мая 2019

Re:

это будет выполняться в Executor или Driver?

Как только вы вызовете tableList.collect (), содержимое файла tables.txt будет перенесено в приложение Driver. Если это хорошо в памяти водителя, все должно быть в порядке. Однако операция сохранения в Dataframe будет выполнена для исполнителя.

Re:

Это приведет к ошибке «Недостаточно памяти»?

Сталкивались ли вы с одним? IMO, если ваш файл tables.txt не слишком велик, у вас должно быть все в порядке. Я предполагаю, что размер входных данных равен 45 ГБ - это данные в таблицах, упомянутых в tables.txt.

Надеюсь, это поможет.

0 голосов
/ 20 мая 2019

Я бы предложил исключить сбор, поскольку это действие, поэтому все данные из вашего 45-гигабайтного файла загружаются в память. Вы можете попробовать что-то вроде этого

val tableList = spark.read.textFile('tables.txt')
tableList.foreach(tblName => {
val df = spark.read.format("avro").load(inputPath+ "/" + tblName)
df.write.format("orc").mode("overwrite").save(outputPath+"/"+tblName)})
...