Использование контрольной точки данных для перезаписи таблицы завершается неудачей с FileNotFoundException - PullRequest
0 голосов
/ 27 июня 2019

У меня есть некоторый фрейм данных df в pySpark, который получается в результате вызова:

df = spark.sql("select A, B from org_table")
df = df.stuffIdo

Я хочу перезаписать org_table в конце моего скрипта.Поскольку перезапись input-tabels запрещена, я поставил контрольные точки своих данных:

sparkContext.setCheckpointDir("hdfs:/directoryXYZ/PrePro_temp")
checkpointed = df.checkpoint(eager=True)

Теперь линия должна быть нарушена, и я также могу видеть свои контрольные точки с помощью checkpointed.show() (работает).Что не работает, так это написание таблицы:

checkpointed.write.format('parquet')\
    .option("checkpointLocation", "hdfs:/directoryXYZ/PrePro_temp")\
    .mode('overwrite').saveAsTable('org_table')

Это приводит к ошибке:

Причина: java.io.FileNotFoundException: Файл не существует: hdfs: //org_table_path/org_table/part-00081-4e9d12ea-be6a-4a01-8bcf-1e73658a54dd-c000.snappy.parquet

Я пробовал несколько вещей, таких как обновление org_table перед написанием и т. д., ноЯ озадачен здесь. Как я могу решить эту ошибку?

1 Ответ

0 голосов
/ 28 июня 2019

Я был бы осторожен с такими операциями, где преобразованный вход - новый выход Причина в том, что вы можете потерять свои данные в случае любой ошибки. Давайте представим, что ваша логика преобразования содержит ошибки и вы сгенерировали неверные данные. Но вы видели это только один день спустя. Более того, чтобы исправить ошибку, вы не можете использовать только что преобразованные данные. Вам нужны данные до преобразования. Что вы делаете, чтобы снова привести данные в соответствие?

Альтернативный подход будет:

  • экспонирование вида
  • в каждом пакете вы пишете новую таблицу, а в конце вы заменяете только представление этой новой таблицей
  • через несколько дней вы также можете запланировать работу по уборке, которая удалит таблицы за последние X дней

Если вы хотите остаться со своим решением, почему бы просто не сделать это вместо того, чтобы иметь дело с контрольными точками?

df.write.parquet("hdfs:/directoryXYZ/PrePro_temp")\
    .mode('overwrite')

df.load("hdfs:/directoryXYZ/PrePro_temp").write.format('parquet').mode('overwrite').saveAsTable('org_table')

Конечно, вы прочитаете данные дважды, но они выглядят менее хакерскими, чем с контрольной точкой. Более того, вы можете хранить свои «промежуточные» данные в разных каталогах каждый раз, и благодаря этому вы сможете решить проблему, которую я раскрыл в начале. Даже если у вас есть ошибка, вы все равно можете принести действительную версию данных, просто выбрав хороший каталог и выполнив .write.format(...) для org_table.

...