Какова рекомендуемая стратегия кэширования для итерационных вычислений в Spark - PullRequest
0 голосов
/ 14 мая 2018

У меня есть спарк-приложение, выполняющее этапы вычислений - то есть
1. Вычислите ввод большого набора данных в AWS S3 (ds: data-original), получите отфильтрованные результаты (ds1: data-intermediate) и сохраните ds1 в AWS S3.
2. И в том же приложении продолжаем обрабатывать (фильтровать) ds1: data-intermediate и выводить конечный результат ds2: data-final.
3. Выполните некоторые действия на ds2, а затем сохраните ds2 на AWS S3.

Таким образом, обработка выглядит так:

ds -> ds1
//ds1.persist()
ds1.write.save(...)
ds1 -> ds2
ds2.cache()
ds2.count
ds2.distinct
ds2.write.save(...)

Размер данных ds / ds1 / ds2 примерно равен 100 ГБ, 10 ГБ, 1 ГБ соответственно.
Я хочу использовать RDD-кэширование, чтобы избежать избыточных вычислений, но также хочу избежать ненужных SerDes и дискового ввода-вывода.

Мой вопрос: если я не сохраню ds1, как показано в строке //ds1.persist(), будет ли приложение перезапускать ds1, когда оно вычислит ds1 -> ds2?

Спасибо!

Ответы [ 2 ]

0 голосов
/ 14 мая 2018

Если я правильно понял вопрос, никаких действий нет, кроме действий сверху d2(the final result). Поскольку искровые преобразования ленивы, пока они не встретятся с действием, предыдущие СДР не будут материализованы. Чтобы ответить на ваш вопрос:

если я не сохраню ds1, как показано линией //ds1.persist (), вызовет повторное вычисление приложением ds1 при вычислении ds1 -> ds2?

, даже если вы кешируете / сохраняете ds1, оно не будет вычислено, пока вы не посчитаете / отличитесь от ds2. поэтому ds1 не «пересчитывается», как это вычисляется впервые.

Ваша обработка может быть:

1)ds -> ds1
2)ds1 -> ds2
3)ds2.cache()
4)ds2.count  //your first action. at this point ds1 & ds2 will be computed & cached
5)ds2.distinct  // will not compute ds1 & ds2 again as its cached previously.
6)ds2.write.save(...) // assuming you are not storing d2 directly but after some transformation/action on top of d2. if you just want to save d2, then do write.save(..) on line 3 instead.
0 голосов
/ 14 мая 2018

У вас есть несколько вариантов, от быстрого и дорогого до медленного и дешевого:

  • df.persist(MEMORY_ONLY) кэширует данные в ОЗУ кластера. Это очень быстро, если у вас достаточно памяти. Стал отличным вариантом после изменения ценовой модели AWS на секунду. Недостаток: если у вас сложный конвейер, который вы запускаете на точечных экземплярах, сбой одного экземпляра приводит к сбою всего вычисления.
  • df.persist(MEMORY_AND_DISK) медленнее, но все же хороший вариант, поскольку данные кэшируются в локальных хранилищах исполнителей Spark. У него тот же недостаток, что и у варианта 1.
  • Сохранение промежуточных вычислений в HDFS в эффективном формате (столбчатый, как паркет в большинстве случаев). Он должен быть близок к варианту 2 с точки зрения производительности и допускает сбой экземпляра, если вы запускаете его на EMR.
  • Сохраните промежуточные вычисления в S3, который является самым медленным, но, возможно, хорошим вариантом, если вы запускаете переходный кластер и вероятность сбоя в середине, значительна.

Обратите внимание, что в опциях 3 необходимо сохранить, а затем загрузить новый фрейм данных, чтобы вырезать график вычислений.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...