У меня есть крошечный фрейм данных pyspark с отношениями и функцией, которая вычисляет транзитивное замыкание. Я уже знаю несколько способов, которыми я могу улучшить функцию (включая избавление от groupBy
), но давайте придерживаться этого. Когда я итеративно применяю функцию замыкания, время вычислений экспоненциально увеличивается, и искра даже исчерпывает память кучи.
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import col, min
def closure(eq: DataFrame) -> DataFrame:
eqrev = eq.select(col("ID2").alias("ID1"), col("ID1").alias("ID2"))
bi = eq.union(eqrev).distinct().cache()
oldCount = 0
nextCount = bi.count()
while True:
oldCount = nextCount
newEdges = bi.alias("b1").join(bi.alias("b2"), col("b1.ID1") == col("b2.ID1")).select(col("b1.ID2").alias("ID1"), col("b2.ID2"))
bi = bi.union(newEdges).distinct().cache()
nextCount = bi.count()
if nextCount == oldCount:
break
return bi.alias("b1").filter(col("b1.ID1") > col("b1.ID2")).groupBy("ID1").agg(min("ID2").alias("ID2")).cache()
b0 = sqlContext.createDataFrame([[ 22, 18 ], [ 20, 15] , [ 25, 26], [ 25, 29 ]], [ "ID1", "ID2" ])
b1 = closure(b0)
display(b1)
b2 = closure(b1)
display(b2)
b3 = closure(b2)
display(b3)
b4 = closure(b3)
b1
, b2
, b3
все имеют 4 строки и 200 разделов (которые вводятся join
). План выполнения растет линейно: для b4
это 13 этапов.
В моем небольшом кластере вычисление b2
занимает 8 секунд, b3
занимает 40 секунд, а b4
дает java.lang.OutOfMemoryError: Java heap space
через несколько минут.
Я бы ожидал, что, учитывая, что я кеширую результат каждого закрытия, искровой двигатель сможет это решить.
Некоторые статьи по теме:
Если я изменю .cache()
в последней строке функции на .localCheckpoint()
, я не получу ни увеличение времени выполнения, ни исключение нехватки памяти. Документация `localCheckPoint) гласит: Контрольные точки могут использоваться для усечения логического плана этого DataFrame, что особенно полезно в итерационных алгоритмах, где план может расти в геометрической прогрессии. Локальные контрольные точки хранятся у исполнителей, использующих подсистему кэширования, и поэтому они ненадежны.
Теперь у меня есть следующие вопросы:
У меня уже есть проблемы с 4 итерациями почти без данных. Это действительно следует ожидать?
Почему время вычислений увеличивается так быстро, и почему двигателю не хватает места в куче? План выполнения все еще умещается на моем экране.
Каковы последствия использования localCheckPoint
в случае сбоев?