Ниже фрагмент кода работает нормально. (Чтение CSV, Чтение паркета и объединение друг с другом)
//Reading csv file -- getting three columns: Number of records: 1
df1=spark.read.format("csv").load(filePath)
df2=spark.read.parquet(inputFilePath)
//Join with Another table : Number of records: 30 Million, total
columns: 15
df2.join(broadcast(df1), col("df2col1") === col("df1col1") "right")
Это утомлено, что приведенный ниже фрагмент кода не работает.(Чтение Hbase, Чтение паркета и присоединение друг к другу) (Различие заключается в чтении из Hbase)
//Reading from Hbase (It read from hbase properly -- getting three columns: Number of records: 1
df1=read from Hbase code
// It read from Hbase properly and able to show one record.
df1.show
df2=spark.read.parquet(inputFilePath)
//Join with Another table : Number of records: 50 Million, total
columns: 15
df2.join(broadcast(df1), col("df2col1") === col("df1col1") "right")
Ошибка : вызвано: org.apache.spark.SparkException: Jobпрервано из-за сбоя этапа: общий размер сериализованных результатов 56 задач (1024,4 МБ) больше, чем spark.driver.maxResultSize (1024,0 МБ)
Затем я добавил spark.driver.maxResultSize = 5g, затем еще одинвозникла ошибка, ошибка пространства кучи Java (запустите на ThreadPoolExecutor.java).Если я наблюдаю использование памяти в Менеджере, я вижу, что использование просто продолжает расти, пока не достигнет ~ 50 ГБ, после чего возникает ошибка OOM.Поэтому по какой-либо причине объем оперативной памяти, используемой для выполнения этой операции, в ~ 10 раз превышает размер RDD, который я пытаюсь использовать.
Если я сохраню df1 в памяти и на диске и выполняю подсчет().Программа работает отлично.Фрагмент кода ниже
//Reading from Hbase -- getting three columns: Number of records: 1
df1=read from Hbase code
**df1.persist(StorageLevel.MEMORY_AND_DISK)
val cnt = df1.count()**
df2=spark.read.parquet(inputFilePath)
//Join with Another table : Number of records: 50 Million, total
columns: 15
df2.join(broadcast(df1), col("df2col1") === col("df1col1") "right")
Он работает с файлом, даже если он имеет те же данные, но не с Hbase.Выполнение этого на кластере 100 рабочих узлов с 125 ГБ памяти на каждом.Так что память не проблема.
Мой вопрос здесь заключается в том, что и файл, и Hbase имеют одинаковые данные и оба могут читать и отображать () данные.Но почему только Hbase терпит неудачу.Я изо всех сил пытаюсь понять, что может пойти не так с этим кодом.Любые предложения будут оценены.