Spark Как записать в файл паркета из данных, используя синхронный API - PullRequest
0 голосов
/ 31 января 2020

У меня есть сценарий использования, который я пытаюсь решить с помощью Spark. Случай использования заключается в том, что мне нужно вызвать API, который ожидает batchSize и token, а затем он возвращает токен для следующей страницы. Это дает мне список JSON объектов. Теперь я должен вызывать этот API до тех пор, пока не будут возвращены все результаты, и записать их все в s3 в формате паркета. Размер возвращаемого объекта может варьироваться от 0 до 100 миллионов.

Мой подход заключается в том, что я сначала получаю, скажем, пакет из 1 миллиона объектов, я конвертирую их в набор данных и затем записываю в паркет, используя

dataSet.repartition(1).write.mode(SaveMode.Append)
      .option("mapreduce.fileoutputcommitter.algorithm.version", "2")
      .parquet(s"s3a://somepath/")

и затем повторяйте процесс, пока мой API не скажет, что данных больше нет, т.е. token равно нулю

Таким образом, процесс заключается в том, что эти вызовы API должны будут выполняться на драйвере и последовательно. И как только я получу миллион, я напишу на s3.

Я видел эти проблемы с памятью на драйвере.

Application application_1580165903122_19411 failed 1 times due to AM Container for appattempt_1580165903122_19411_000001 exited with exitCode: -104
Diagnostics: Container [pid=28727,containerID=container_1580165903122_19411_01_000001] is running beyond physical memory limits. Current usage: 6.6 GB of 6.6 GB physical memory used; 16.5 GB of 13.9 GB virtual memory used. Killing container.
Dump of the process-tree for container_1580165903122_19411_01_000001 :

Я видел странное поведение в том смысле, что иногда 30 миллионов работают нормально, а иногда это не удается из-за этого. Даже 1 миллион иногда терпит неудачу.

Мне интересно, делаю ли я какую-то очень глупую ошибку или есть какой-то лучший подход к этому?

1 Ответ

0 голосов
/ 01 февраля 2020

Этот дизайн не масштабируется и создает большую нагрузку на драйвер, поэтому ожидается, что он достигнет sh. Кроме того, перед записью в s3 в памяти накапливается много данных.

Я рекомендую вам использовать потоковую передачу Spark для чтения данных из API. Таким образом, многие исполнители выполнят свою работу, и решение будет значительно масштабируемым. , Вот пример - вызов службы RestAPI из Spark Streaming

В этих исполнителях вы можете накапливать ответ API сбалансированным образом, например, накапливать 20 000 записей, но не ждать 5M записей. После, скажем, 20 000, запишите их в S3 в режиме «добавления». Режим «добавления» поможет нескольким процессам работать в тандеме и не наступать друг на друга.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...