Скажем, у меня есть 4 блока инструкций искры, представленных как Dataframe A, B, C и D.
Dataframe имеет зависимости:
- C требует, чтобы A & B были выполнены
- D требует, чтобы B был готов
Теперь я хочу сохранить и C, и вывод D.Чтобы не пересчитывать B дважды, я использую
B = B.cache()
Как есть, если я сначала сохраняю C, у меня A & B работает параллельно, а затем C, а затем, когда я сохраняю D, нужно будет только вычислить D иэто достаточно хорошее планирование.
Теперь проблема заключается в следующем: Если вместо кэширования BI сохранить его в паркет как этот
B.write.mode("overwrite").parquet(CacheLocation)
B = spark.read.parquet(CacheLocation)
Эта скорость в течение всего циклакак минимум в 10 раз быстрее, чем при использовании B.cache () (и в 30 раз быстрее, чем вообще не использовать кеш на B)
Проблема с вышесказанным заключается в том, что теперь я не могу одновременно запустить A и Bвремя, если я не использую фьючерсы и не решаю кешировать А.Я очень ограничен в производительности, и мой реальный код имеет более 50 выходов блоков / 4 для синхронизации, что является кошмаром, если я не могу использовать планирование Spark по умолчанию.
Тогда мне было интересно, есть ли способ переопределить метод кэширования Spark для сохранения в виде паркета вместо памяти?Или есть какие-нибудь способы добавить такое кэширование паркета в собственный планировщик Spark?
Иллюстрация псевдокода:
Сценарий 1: при использовании кэширования весь цикл выполняется за 300 с, но A и B работают параллельно
val A = parquet.read(A).withColumn(NewA,FormulaA)
val B = parquet.read(B).withColumn(NewB,FormulaC).cache
val C = A.join(B, FormulaA===FormulaC,left)
val D = B.withColumn(NewD, FormulaD)
C.write.mode("overwrite").parquet(SaveCLocation)
D.write.mode("overwrite").parquet(SaveDLocation)
Сценарий 2: с использованием паркета в качестве кэширования, весь цикл в 30 с, но серийный запуск A и B
val A = parquet(A).withColumn(NewA,FormulaA)
var B = parquet(B).withColumn(NewB,FormulaC)
B.write.parquet(CacheLocation)
B = spark.read.parquet(CacheLocation)
val C = A.join(B, FormulaA===FormulaC,left)
val D = B.withColumn(NewD, FormulaD)
C.write.parquet(SaveCLocation)
D.write.parquet(SaveDLocation)