Spark Dataframe Присоединяйтесь к выпуску - PullRequest
3 голосов
/ 11 марта 2019

Ниже фрагмент кода работает нормально. (Чтение CSV, Чтение паркета и объединение друг с другом)

//Reading csv file -- getting three columns: Number of records: 1
 df1=spark.read.format("csv").load(filePath) 

df2=spark.read.parquet(inputFilePath)

//Join with Another table : Number of records: 30 Million, total 
columns: 15
df2.join(broadcast(df1), col("df2col1") === col("df1col1")  "right")

Это утомлено, что приведенный ниже фрагмент кода не работает.(Чтение Hbase, Чтение паркета и присоединение друг к другу) (Различие заключается в чтении из Hbase)

//Reading from Hbase (It read from hbase properly -- getting three columns: Number of records: 1
 df1=read from Hbase code
 // It read from Hbase properly and able to show one record.
 df1.show

df2=spark.read.parquet(inputFilePath)

//Join with Another table : Number of records: 50 Million, total 
columns: 15
df2.join(broadcast(df1), col("df2col1") === col("df1col1")  "right")

Ошибка : вызвано: org.apache.spark.SparkException: Jobпрервано из-за сбоя этапа: общий размер сериализованных результатов 56 задач (1024,4 МБ) больше, чем spark.driver.maxResultSize (1024,0 МБ)

Затем я добавил spark.driver.maxResultSize = 5g, затем еще одинвозникла ошибка, ошибка пространства кучи Java (запустите на ThreadPoolExecutor.java).Если я наблюдаю использование памяти в Менеджере, я вижу, что использование просто продолжает расти, пока не достигнет ~ 50 ГБ, после чего возникает ошибка OOM.Поэтому по какой-либо причине объем оперативной памяти, используемой для выполнения этой операции, в ~ 10 раз превышает размер RDD, который я пытаюсь использовать.

Если я сохраню df1 в памяти и на диске и выполняю подсчет().Программа работает отлично.Фрагмент кода ниже

//Reading from Hbase -- getting three columns: Number of records: 1
 df1=read from Hbase code

**df1.persist(StorageLevel.MEMORY_AND_DISK)
val cnt = df1.count()**

df2=spark.read.parquet(inputFilePath)

//Join with Another table : Number of records: 50 Million, total 
columns: 15
df2.join(broadcast(df1), col("df2col1") === col("df1col1")  "right")

Он работает с файлом, даже если он имеет те же данные, но не с Hbase.Выполнение этого на кластере 100 рабочих узлов с 125 ГБ памяти на каждом.Так что память не проблема.

Мой вопрос здесь заключается в том, что и файл, и Hbase имеют одинаковые данные и оба могут читать и отображать () данные.Но почему только Hbase терпит неудачу.Я изо всех сил пытаюсь понять, что может пойти не так с этим кодом.Любые предложения будут оценены.

Ответы [ 3 ]

2 голосов
/ 11 марта 2019

Когда данные извлекаются, искра не знает о количестве строк, которые извлекаются из HBase, поэтому стратегия выбрана как сортировка слиянием.

, таким образом, она пытается сортировать и перетасовывать данные черезexecutors.

, чтобы избежать проблемы, мы можем использовать широковещательное соединение, в то же время мы не будем сортировать и перетасовывать данные через от df2, используя ключевой столбец, который показывает последний оператор в вашем кодефрагмент.

однако, чтобы обойти это (поскольку это только одна строка), мы можем использовать выражение Case для добавляемых столбцов.

пример:

df.withColumn(
"newCol"
,when(col("df2col1").eq(lit(hbaseKey))
    ,lit(hbaseValueCol1))
 .otherwise(lit(null))
1 голос
/ 11 марта 2019

Я тоже иногда борюсь с этой ошибкой. Часто это происходит, когда spark пытается транслировать большую таблицу во время объединения (это происходит, когда оптимизатор spark недооценивает размер таблицы или статистика неверна). Поскольку нет подсказки для принудительного объединения с сортировкой-слиянием ( Как намекнуть для объединения с сортировкой или с перемешанным хеш-соединением (и пропустить широковещательное хеш-соединение)? ), единственный вариант - отключить широковещательные соединения, установив spark.sql.autoBroadcastJoinThreshold= -1

0 голосов
/ 11 марта 2019

Когда у меня возникают проблемы с памятью во время объединения, это обычно означает одну из двух причин:

  1. У вас слишком мало разделов в фреймах данных (разделы слишком большие)
  2. Естьмного дубликатов в двух фреймах данных на ключе, к которому вы присоединяетесь, и объединение взрывает вашу память.

Ad 1. Я думаю, что вы должны посмотреть на количество разделов в каждой таблице перед объединением.Когда Spark читает файл, он не обязательно сохраняет такое же количество разделов, как было в исходной таблице (parquet, csv или другое).Чтение из CSV по сравнению с чтением из HBase может создать разное количество разделов, и поэтому вы видите различия в производительности.Слишком большие разделы становятся еще больше после объединения, и это создает проблемы с памятью.Посмотрите на Пиковое выполнение памяти для каждой задачи в интерфейсе Spark.Это даст вам представление об использовании памяти для каждой задачи.Я посчитал, что лучше всего держать его ниже 1 Гб.

Решение: Переразбейте таблицы перед объединением.

Объявление.2 Может быть, дело не в этом, но стоит проверить.

...