Можем ли мы повторить поведение Spark .cache (), но вместо этого сохранить файл партера? - PullRequest
0 голосов
/ 14 июня 2019

Скажем, у меня есть 4 блока инструкций искры, представленных как Dataframe A, B, C и D.

Dataframe имеет зависимости:

  • C требует, чтобы A & B были выполнены
  • D требует, чтобы B был готов

Теперь я хочу сохранить и C, и вывод D.Чтобы не пересчитывать B дважды, я использую

B = B.cache()

Как есть, если я сначала сохраняю C, у меня A & B работает параллельно, а затем C, а затем, когда я сохраняю D, нужно будет только вычислить D иэто достаточно хорошее планирование.

Теперь проблема заключается в следующем: Если вместо кэширования BI сохранить его в паркет как этот

B.write.mode("overwrite").parquet(CacheLocation)
B = spark.read.parquet(CacheLocation)

Эта скорость в течение всего циклакак минимум в 10 раз быстрее, чем при использовании B.cache () (и в 30 раз быстрее, чем вообще не использовать кеш на B)

Проблема с вышесказанным заключается в том, что теперь я не могу одновременно запустить A и Bвремя, если я не использую фьючерсы и не решаю кешировать А.Я очень ограничен в производительности, и мой реальный код имеет более 50 выходов блоков / 4 для синхронизации, что является кошмаром, если я не могу использовать планирование Spark по умолчанию.

Тогда мне было интересно, есть ли способ переопределить метод кэширования Spark для сохранения в виде паркета вместо памяти?Или есть какие-нибудь способы добавить такое кэширование паркета в собственный планировщик Spark?

Иллюстрация псевдокода:

Сценарий 1: при использовании кэширования весь цикл выполняется за 300 с, но A и B работают параллельно

val A = parquet.read(A).withColumn(NewA,FormulaA)
val B = parquet.read(B).withColumn(NewB,FormulaC).cache
val C = A.join(B, FormulaA===FormulaC,left)
val D = B.withColumn(NewD, FormulaD)
C.write.mode("overwrite").parquet(SaveCLocation)
D.write.mode("overwrite").parquet(SaveDLocation)

Сценарий 2: с использованием паркета в качестве кэширования, весь цикл в 30 с, но серийный запуск A и B

val A = parquet(A).withColumn(NewA,FormulaA)
var B = parquet(B).withColumn(NewB,FormulaC)
B.write.parquet(CacheLocation)
B = spark.read.parquet(CacheLocation)
val C = A.join(B, FormulaA===FormulaC,left)
val D = B.withColumn(NewD, FormulaD)
C.write.parquet(SaveCLocation)
D.write.parquet(SaveDLocation)

1 Ответ

0 голосов
/ 14 июня 2019

Из того, что вы разместили здесь, мое подозрение верно.

B НЕ оценивается (или не кэшируется) в вашем cache сценарии до этого действия

C.write.mode("overwrite").parquet(SaveCLocation) 

В вашем parquet сценарии, однако, B оценивается (и сохраняется) для этого действия:

B.write.parquet(CacheLocation)

Другими словами, вы не можете сравнивать время выполнения val C = A.join(B, FormulaA===FormulaC,left) в двух сценариях, потому что в одном сценарии B уже оценена и просто должна быть считана с диска, а в другомон должен оценить B первым.

Попробуйте добавить B.count в ваш сценарий кэширования - это вызовет оценку B точно так же, как B.write делает в вашем паркет-сценарии.Затем сравните время выполнения val C = A.join(B, FormulaA===FormulaC,left) в двух сценариях - я уверен, что вы увидите большое преимущество для сценария кэширования:)

val A = parquet.read(A).withColumn(NewA,FormulaA)
val B = parquet.read(B).withColumn(NewB,FormulaC).cache
B.count // force evaluation of B
val C = A.join(B, FormulaA===FormulaC,left)
val D = B.withColumn(NewD, FormulaD)
C.write.mode("overwrite").parquet(SaveCLocation)
D.write.mode("overwrite").parquet(SaveDLocation)
...