Как правильно обрабатывать разделенные паркетные файлы, созданные в Spark Streaming - PullRequest
3 голосов
/ 11 апреля 2019

Моя структурированная потоковая работа Spark непрерывно генерирует файлы паркета, которые я хочу удалить после истечения срока (скажем, через 30 дней).

Я храню свои данные паркета, разделенные с ключом разделения, являющимся датой события в RFC3339 / ISO8601, так что ведение домашнего хозяйства может быть довольно легко выполнено на уровне HDFS на основе задания cron (Удалите все папки с паркетами с помощью ключа ключа <старейшегоAllowedAgeс точки зрения сравнения строк).</p>

Однако, так как я представил потоковую передачу Spark, Spark записывает метаданные в папку с именем _spark_metadata рядом с самими записываемыми данными.Если я сейчас просто удалю файлы HDFS с истекшим сроком действия и запусту пакетное задание spark для всего набора данных, задание не будет выполнено из-за отсутствия файлов.Пакетная работа будет читать метаданные и ожидать, что уже удаленные файлы будут существовать.

Простое решение для этого - просто отключить создание каталога _spark_metadata, как описано здесь: отключение _spark_metadata в структурированной потоковой передаче в spark 2.3.0 .Но так как я не хочу терять производительность при чтении данных для моего регулярного пакетного анализа, мне интересно, не найдется ли лучшего решения.

Я подумал, что тогда я мог бы просто использовать spark для удаления, чтобы он удалял hdf-файлы паркета И обновляет метаданные.Однако простое выполнение

session.sql(String.format("DELETE FROM parquet.`%s` WHERE partitionKey < " + oldestAllowedPartitionAge, path.toString()));

не работает.DELETE к сожалению, это неподдерживаемая операция в Spark ...

Есть ли какое-либо решение, чтобы я мог удалить старые данные, но при этом папка _spark_metadata работала?

Ответы [ 2 ]

2 голосов
/ 12 апреля 2019

Насколько я понимаю, основная цель _spark_metadata состояла в том, чтобы обеспечить отказоустойчивость и избежать перечисления всех файлов для обработки:

Чтобы правильно обрабатывать частичные сбои при сохранении ровно однажды семантика, файлы для каждого пакета записываются в уникальный каталог, а затем атомарно добавляется в журнал метаданных. когда паркет на основе DataSource инициализируется для чтения, мы сначала проверьте этот каталог журналов и используйте его вместо списка файлов, когда нет.

https://github.com/apache/spark/commit/6bc4be64f86afcb38e4444c80c9400b7b6b745de

Ссылка, которую вы указали ( отключение _spark_metadata в структурированной потоковой передаче в spark 2.3.0 ) объясняет, что проблема возникла из-за несовместимого состояния контрольной точки - контрольная точка сгенерировала метаданные, но позже пользователь удалил их вручную и после перезапуска запрос не выполнен, поскольку на контрольной точке ожидается файл метаданных.

Чтобы узнать, не приведет ли отсутствие метаданных к вашей пакетной обработке, взгляните на метод org.apache.spark.sql.execution.datasources.DataSource # resolRelation, где вы можете найти сопоставление с шаблоном в 2 случаях:

  // We are reading from the results of a streaming query. Load files from the metadata log
  // instead of listing them using HDFS APIs.
  case (format: FileFormat, _)
      if FileStreamSink.hasMetadata(
        caseInsensitiveOptions.get("path").toSeq ++ paths,
        sparkSession.sessionState.newHadoopConf()) =>
  case (format: FileFormat, _) =>
    val globbedPaths =
      checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)

И hasMetadata метод выглядит так:

  def hasMetadata(path: Seq[String], hadoopConf: Configuration): Boolean = {
    path match {
      case Seq(singlePath) =>
        try {
          val hdfsPath = new Path(singlePath)
          val fs = hdfsPath.getFileSystem(hadoopConf)
          if (fs.isDirectory(hdfsPath)) {
            fs.exists(new Path(hdfsPath, metadataDir))
          } else {
            false
          }
        } catch {
          case NonFatal(e) =>
            logWarning(s"Error while looking for metadata directory.")
            false
        }
      case _ => false
    }
  }

Как видите, нет риска сбоя (по крайней мере, при чтении кода!). Если у вас есть, пожалуйста, дайте больше контекста, потому что, возможно, проблема в другом месте.

Что касается вашей производительности, этот _spark_metadata содержит только список файлов, поэтому, конечно, Spark сначала должен перечислить файлы из вашего входного каталога. Но по моему опыту это не самая дорогая операция. Например, перечисление каталога с 1297 файлами на AWS S3 занимает приблизительно 9 секунд. После этого вам решать, хотите ли вы провести простой процесс очистки или немного более медленную пакетную обработку. Если у вас таких файлов намного больше, может быть, вам также следует сгруппировать их в более крупные файлы, например, 256 МБ или больше?

Тем не менее, если вы хотите сохранить _spark_metadata, возможно, есть способ удалить файлы с помощью вашего приложения для очистки. Но это будет сложно, так как у вас будет 2 приложения (потоковая передача и очистка), работающих с одними и теми же данными.

Более подробную информацию о _spark_metadata можно найти здесь: Как изменить расположение каталога _spark_metadata?

0 голосов
/ 14 апреля 2019

На самом деле это одна из известных проблем в структурированной потоковой передаче ( SPARK-24295 ), хотя это происходит только с массивными входными файлами, и конечные пользователи используют собственные обходные пути. Например, остановить запрос -> удалить старые входные файлы -> манипулировать метаданными вручную, чтобы очистить их -> перезапустить запрос.

Учитывая, что манипулирование метаданными вручную не является тривиальным и не идеальным (поскольку оно должно остановить потоковый запрос и заставить конечных пользователей понимать формат метаданных), SPARK-27188 предлагается в качестве альтернативы - он применяется сохранение и удаление устаревших входных файлов из метаданных.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...