Итеративная искра с кэшированием исчерпывает память - PullRequest
1 голос
/ 05 июня 2019

У меня есть крошечный фрейм данных pyspark с отношениями и функцией, которая вычисляет транзитивное замыкание. Я уже знаю несколько способов, которыми я могу улучшить функцию (включая избавление от groupBy), но давайте придерживаться этого. Когда я итеративно применяю функцию замыкания, время вычислений экспоненциально увеличивается, и искра даже исчерпывает память кучи.

from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import col, min

def closure(eq: DataFrame) -> DataFrame:
    eqrev = eq.select(col("ID2").alias("ID1"), col("ID1").alias("ID2"))
    bi = eq.union(eqrev).distinct().cache()

    oldCount = 0
    nextCount = bi.count()

    while True:
        oldCount = nextCount
        newEdges = bi.alias("b1").join(bi.alias("b2"), col("b1.ID1") == col("b2.ID1")).select(col("b1.ID2").alias("ID1"), col("b2.ID2"))
        bi = bi.union(newEdges).distinct().cache()
        nextCount = bi.count()
        if nextCount == oldCount:
            break

    return bi.alias("b1").filter(col("b1.ID1") > col("b1.ID2")).groupBy("ID1").agg(min("ID2").alias("ID2")).cache()

b0 = sqlContext.createDataFrame([[ 22, 18 ], [ 20, 15] , [ 25, 26], [ 25, 29 ]], [ "ID1", "ID2" ])

b1 = closure(b0)
display(b1)
b2 = closure(b1)
display(b2)
b3 = closure(b2)
display(b3)
b4 = closure(b3)

b1, b2, b3 все имеют 4 строки и 200 разделов (которые вводятся join). План выполнения растет линейно: для b4 это 13 этапов. В моем небольшом кластере вычисление b2 занимает 8 секунд, b3 занимает 40 секунд, а b4 дает java.lang.OutOfMemoryError: Java heap space через несколько минут.

Я бы ожидал, что, учитывая, что я кеширую результат каждого закрытия, искровой двигатель сможет это решить.

Некоторые статьи по теме:

Если я изменю .cache() в последней строке функции на .localCheckpoint(), я не получу ни увеличение времени выполнения, ни исключение нехватки памяти. Документация `localCheckPoint) гласит: Контрольные точки могут использоваться для усечения логического плана этого DataFrame, что особенно полезно в итерационных алгоритмах, где план может расти в геометрической прогрессии. Локальные контрольные точки хранятся у исполнителей, использующих подсистему кэширования, и поэтому они ненадежны.

Теперь у меня есть следующие вопросы:

  • У меня уже есть проблемы с 4 итерациями почти без данных. Это действительно следует ожидать?

  • Почему время вычислений увеличивается так быстро, и почему двигателю не хватает места в куче? План выполнения все еще умещается на моем экране.

  • Каковы последствия использования localCheckPoint в случае сбоев?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...