Какой самый эффективный способ вызвать немедленное кэширование в Spark? - PullRequest
0 голосов
/ 20 февраля 2020

Для целей синхронизации мне нужно принудительно установить кэш перед выполнением функции. Мой первоначальный подход состоял в том, чтобы использовать действие count(), поскольку оно будет кэшировать СДР во всех разделах, в отличие от take(), но есть ли более эффективный способ принудительно применить его к вычислениям, связи или времени?

// Load data, partition and mark to be cached
val data = sc.textFile("input.txt").map(_.toInt)
val partitioner = new RangePartitioner(16, data)
val partitioned_data = data.partitionBy(partitioner).cache()

// Force cache with count or something more efficient
partitioned_data.count()

// Do something
something(partitioned_data)

1 Ответ

1 голос
/ 20 февраля 2020

Все это зависит от того, что вы пытаетесь сделать. Если вы заметили, что ваша среда близка к своему пределу с постоянной памятью, я бы посоветовал сохранить локальный, очистить кэш, перезагрузить и повторно кэшировать технику. Однако ниже я ncatalogued все легкие функции и запускаю их для файла записи 2M, чтобы показать их относительное время выполнения в сравнении.

Подиум таков:

1-й (Трехсторонний-t ie): дубль (1), дубль (1000), первый; время: 9 секунд

4: счет; время: 17 секунд

5th: собирать; время: 21 секунда

Отказ от ответственности-1: да, я знаю, что счет потерян, но я объявляю его тайным победителем из-за множества очков произвольного стиля, которые он получил, главным образом потому, что я думаю, что этот ответ постепенно становится «Чей Line Is It Anyway '.

Disclaimer-2: Все тесты были в состоянии запускаться с конфигурациями памяти Spark по умолчанию, за исключением сбора, где мне нужно было установить его примерно на коэффициент выше, и это дало запуск 21 секунды время.

Если вы хотите попробовать это дома, вот код, который вы можете запустить ( играет банально-игровое шоу musi c):

val inputDF = spark.read.format("").load("")

var arrayOfCommand : Array[String] = Array("")
var arrayOfTime : Array[Long] = Array("0".toLong)

inputDF.count

val inputDF2 = inputDF.selectExpr("*", "'Count Run' as CommandColumn").persist

val countStartTime = System.nanoTime()
inputDF2.count

val countEndTime = System.nanoTime()
val countRunTime = (countEndTime-countStartTime)/1000000000

arrayOfCommand = Array("Count")
arrayOfTime = Array(countRunTime)

spark.catalog.clearCache
val inputDF3 = inputDF.selectExpr("*", "'Take 1 Run' as CommandColumn").persist

val takeStartTime = System.nanoTime()
inputDF3.take(1)

val takeEndTime = System.nanoTime()
val takeRunTime = (takeEndTime-takeStartTime)/1000000000

arrayOfCommand = arrayOfCommand ++ Array("Take(1)")
arrayOfTime = arrayOfTime ++ Array(takeRunTime)

spark.catalog.clearCache
val inputDF4 = inputDF.selectExpr("*", "'Take 1000 Run' as CommandColumn").persist

val takeStartTime2 = System.nanoTime()
inputDF4.take(1000)

val takeEndTime2 = System.nanoTime()
val takeRunTime2 = (takeEndTime2-takeStartTime2)/1000000000

arrayOfCommand = arrayOfCommand ++ Array("Take(1000)")
arrayOfTime = arrayOfTime ++ Array(takeRunTime)

spark.catalog.clearCache
val inputDF5 = inputDF.selectExpr("*", "'Collect Run' as CommandColumn").persist

val collectStartTime = System.nanoTime()
inputDF5.collect

val collectEndTime = System.nanoTime()
val collectRunTime = (collectEndTime-collectStartTime)/1000000000

arrayOfCommand = arrayOfCommand ++ Array("Collect")
arrayOfTime = arrayOfTime ++ Array(collectRunTime)


spark.catalog.clearCache
val inputDF6 = inputDF.selectExpr("*", "'First Run' as CommandColumn").persist

val firstStartTime = System.nanoTime()
inputDF6.first

val firstEndTime = System.nanoTime()
val firstRunTime = (firstEndTime-firstStartTime)/1000000000

arrayOfCommand = arrayOfCommand ++ Array("First")
arrayOfTime = arrayOfTime ++ Array(firstRunTime)
...