Pyspark - не сохраняющий родительский фрейм данных также удаляет дочерний фрейм данных из кэша - PullRequest
0 голосов
/ 05 февраля 2019

Я делаю что-то вроде этого:

import pandas as pd

pdf = pd.DataFrame({
    'a': [1, 2, 3],
    'b': ['a', 'b', 'c']
})

parent_df = spark.createDataFrame(pdf)
parent_df.cache().count()  

child_df = parent_df.replace('c', 'x')
child_df.cache().count()

parent_df.unpersist()

По сути, я хочу кэшировать parent_df, потому что на следующих шагах я делаю некоторые тяжелые преобразования для него.После того, как я закончу их и вернусь child_df, мне больше не нужен parent_df, поэтому я хочу выпустить его из кэша.Тем не менее, при этом unpersists также недавно кэшированный child_df!

Очевидно, что вопросы таковы:

  • почему это происходит?
  • как мне добиться того, чего я хочу (освобождая parent_df из кэша, сохраняя при этомновый child_df в кеше)?

Интересно, что работает противоположный сценарий - то есть, если я отменю child_df вместо parent_df в последней строке, parent_df будет кешироваться, как и ожидалось, покаchild_df будет выпущено.

PS: я нашел похожий вопрос здесь Понимание кэширования Spark .Однако ответ на этот вопрос, похоже, не работает в этом случае, поскольку здесь мы уже вызываем действие (.count()) сразу после кэширования.

Ответы [ 2 ]

0 голосов
/ 05 августа 2019

Это сознательное проектное решение, основанное на согласованности данных.Одной из возможных причин отказа от родительского контроля может быть то, что вы ожидаете, что его исходные данные изменятся.Наличие родителя с новыми данными и очевидного ребенка, использующего старые данные, может привести к неожиданным и противоречивым результатам.Таким образом, любые кэшированные дочерние элементы родительского элемента становятся недействительными, если родитель имеет значение.

Существует небольшое обсуждение в PR, который реализовал это изменение и в этом отчете об ошибках после измененияпредставил .

Как уже упоминалось во второй ссылке, если вам нужно сохранить ребенка, вы можете сделать это, материализовав его в виде таблицы, используя saveAsTable.

0 голосов
/ 05 февраля 2019

Хорошо, я думаю, что нашел решение:

  • Во-первых, мое предположение относительно того, почему это происходит, заключается в том, что точка кэширования parent_df является частью child_df.родословная.Т.е. даже если child_df использует более позднюю точку кэширования, ее DAG по-прежнему содержит более ранний бит из parent_df.Таким образом, удаление этой точки кэша как-то повлияет на более поздние точки кэширования.

  • Что касается того, как предотвратить это, выполните следующие действия:

import pandas as pd

pdf = pd.DataFrame({
    'a': [1, 2, 3],
    'b': ['a', 'b', 'c']
})

parent_df = spark.createDataFrame(pdf)
parent_df.cache().count()  

# this is the relevant line
child_df = spark.createDataFrame(parent_df.rdd, schema=parent_df.schema) 

child_df = child_df.replace('c', 'x')
child_df.cache().count()

parent_df.unpersist()

Что происходит в соответствующей строке (отмеченной комментарием), так это то, что линия child_df обрезана, чтобы не включать секцию, соответствующую parent_df, и начинается с «свежего СДР».Unpersisting parent_df затем оставляет линию child_df неизменной.

Опять же - хотя это, кажется, работает, я приветствую больше объяснений / подтверждений этой теории как принятый ответ!

...