Очистка контрольной точки Spark Dataframe - PullRequest
0 голосов
/ 31 января 2020

У меня есть датафрейм в спарк, где был загружен весь раздел из Hive, и мне нужно разорвать родословную, чтобы перезаписать тот же раздел после некоторых модификаций данных. Тем не менее, когда работа с искрой завершена, у меня остаются данные с контрольной точки на HDFS. Почему Spark не очищает это сам по себе или мне чего-то не хватает?

spark.sparkContext.setCheckpointDir("/home/user/checkpoint/")
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

val df = spark.table("db.my_table").filter(col("partition").equal(2))

// ... transformations to the dataframe

val checkpointDf = df.checkpoint()
checkpointDf.write.format("parquet").mode(SaveMode.Overwrite).insertInto("db.my_table")

После этого у меня есть этот файл на HDFS:

/home/user/checkpoint/214797f2-ce2e-4962-973d-8f215e5d5dd8/rdd-23/part-00000

И каждый раз, когда я запускаю задание spark: я просто получаю новый каталог с новым уникальным идентификатором, содержащим файлы для каждого RDD, который был в кадрах данных.

1 Ответ

0 голосов
/ 01 февраля 2020

Spark имеет неявный механизм очистки файлов контрольных точек.

Добавьте это свойство в spark-defaults.conf.

spark.cleaner.referenceTracking.cleanCheckpoints  true #Default is false

Подробнее о конфигурации Spark можно узнать в Официальный Spark страница конфигурации

Если вы хотите удалить каталог контрольных точек из HDFS, вы можете удалить его с помощью Python, в конце вашего скрипта вы можете использовать эту команду rmtree .

Это свойство spark.cleaner.referenceTracking.cleanCheckpoints как true, позволяет очистить удалить старые файлы контрольных точек в каталоге контрольных точек.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...