Насколько я понимаю, основная цель _spark_metadata
состояла в том, чтобы обеспечить отказоустойчивость и избежать перечисления всех файлов для обработки:
Чтобы правильно обрабатывать частичные сбои при сохранении
ровно однажды семантика, файлы для каждого пакета записываются в
уникальный каталог, а затем атомарно добавляется в журнал метаданных. когда
паркет на основе DataSource
инициализируется для чтения, мы сначала
проверьте этот каталог журналов и используйте его вместо списка файлов, когда
нет.
https://github.com/apache/spark/commit/6bc4be64f86afcb38e4444c80c9400b7b6b745de
Ссылка, которую вы указали ( отключение _spark_metadata в структурированной потоковой передаче в spark 2.3.0 ) объясняет, что проблема возникла из-за несовместимого состояния контрольной точки - контрольная точка сгенерировала метаданные, но позже пользователь удалил их вручную и после перезапуска запрос не выполнен, поскольку на контрольной точке ожидается файл метаданных.
Чтобы узнать, не приведет ли отсутствие метаданных к вашей пакетной обработке, взгляните на метод org.apache.spark.sql.execution.datasources.DataSource # resolRelation, где вы можете найти сопоставление с шаблоном в 2 случаях:
// We are reading from the results of a streaming query. Load files from the metadata log
// instead of listing them using HDFS APIs.
case (format: FileFormat, _)
if FileStreamSink.hasMetadata(
caseInsensitiveOptions.get("path").toSeq ++ paths,
sparkSession.sessionState.newHadoopConf()) =>
case (format: FileFormat, _) =>
val globbedPaths =
checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
И hasMetadata
метод выглядит так:
def hasMetadata(path: Seq[String], hadoopConf: Configuration): Boolean = {
path match {
case Seq(singlePath) =>
try {
val hdfsPath = new Path(singlePath)
val fs = hdfsPath.getFileSystem(hadoopConf)
if (fs.isDirectory(hdfsPath)) {
fs.exists(new Path(hdfsPath, metadataDir))
} else {
false
}
} catch {
case NonFatal(e) =>
logWarning(s"Error while looking for metadata directory.")
false
}
case _ => false
}
}
Как видите, нет риска сбоя (по крайней мере, при чтении кода!). Если у вас есть, пожалуйста, дайте больше контекста, потому что, возможно, проблема в другом месте.
Что касается вашей производительности, этот _spark_metadata
содержит только список файлов, поэтому, конечно, Spark сначала должен перечислить файлы из вашего входного каталога. Но по моему опыту это не самая дорогая операция. Например, перечисление каталога с 1297 файлами на AWS S3 занимает приблизительно 9 секунд. После этого вам решать, хотите ли вы провести простой процесс очистки или немного более медленную пакетную обработку. Если у вас таких файлов намного больше, может быть, вам также следует сгруппировать их в более крупные файлы, например, 256 МБ или больше?
Тем не менее, если вы хотите сохранить _spark_metadata
, возможно, есть способ удалить файлы с помощью вашего приложения для очистки. Но это будет сложно, так как у вас будет 2 приложения (потоковая передача и очистка), работающих с одними и теми же данными.
Более подробную информацию о _spark_metadata
можно найти здесь: Как изменить расположение каталога _spark_metadata?