У меня есть сценарий использования, который я пытаюсь решить с помощью Spark. Случай использования заключается в том, что мне нужно вызвать API, который ожидает batchSize
и token
, а затем он возвращает токен для следующей страницы. Это дает мне список JSON объектов. Теперь я должен вызывать этот API до тех пор, пока не будут возвращены все результаты, и записать их все в s3 в формате паркета. Размер возвращаемого объекта может варьироваться от 0 до 100 миллионов.
Мой подход заключается в том, что я сначала получаю, скажем, пакет из 1 миллиона объектов, я конвертирую их в набор данных и затем записываю в паркет, используя
dataSet.repartition(1).write.mode(SaveMode.Append)
.option("mapreduce.fileoutputcommitter.algorithm.version", "2")
.parquet(s"s3a://somepath/")
и затем повторяйте процесс, пока мой API не скажет, что данных больше нет, т.е. token
равно нулю
Таким образом, процесс заключается в том, что эти вызовы API должны будут выполняться на драйвере и последовательно. И как только я получу миллион, я напишу на s3.
Я видел эти проблемы с памятью на драйвере.
Application application_1580165903122_19411 failed 1 times due to AM Container for appattempt_1580165903122_19411_000001 exited with exitCode: -104
Diagnostics: Container [pid=28727,containerID=container_1580165903122_19411_01_000001] is running beyond physical memory limits. Current usage: 6.6 GB of 6.6 GB physical memory used; 16.5 GB of 13.9 GB virtual memory used. Killing container.
Dump of the process-tree for container_1580165903122_19411_01_000001 :
Я видел странное поведение в том смысле, что иногда 30 миллионов работают нормально, а иногда это не удается из-за этого. Даже 1 миллион иногда терпит неудачу.
Мне интересно, делаю ли я какую-то очень глупую ошибку или есть какой-то лучший подход к этому?