Spark - при чтении множества небольших файлов паркетных файлов перед каждым состоянием отображается статус каждого файла - PullRequest
0 голосов
/ 02 ноября 2018

У меня есть сотни тысяч небольших паркетных файлов, которые я пытаюсь регулярно читать в Spark. Мое приложение работает, но перед тем, как файлы будут прочитаны с использованием узлов-исполнителей, узел драйвера, похоже, получает статус каждого отдельного файла. Я прочитал в нем немного, и это необходимо, чтобы вывести схему и разделы. Я попытался представить их так:

sparkSession.baseRelationToDataFrame(
  DataSource
    .apply(
      sparkSession,
      paths = paths, // List of thousands of parquet files in S3
      partitionColumns = Seq("my_join_column"),
      userSpecifiedSchema = Some(schema),
      className = "parquet",
      options = Seq().toMap
    )
    .resolveRelation(checkFilesExist = false)
)

Но даже при предоставлении столбцов схемы и раздела требуется некоторое время. Немного посмотрев код resolveRelation, похоже, что он все еще должен запросить статус каждого файла, чтобы построить InMemoryFileIndex.

Есть ли способ обойти эту проблему?

Я использую spark-sql 2.3.1.

1 Ответ

0 голосов
/ 04 ноября 2018

Нет хорошего способа избежать этой проблемы в текущей архитектуре Spark.

Некоторое время назад я сотрудничал с некоторыми коммиттерами Spark по проекту LazyBaseRelation, который может задерживать обнаружение информации о файле до тех пор, пока не будет известно количество разделов - в отличие от только схемы - источника данных, что Это технически необходимо, пока не будет выполнено действие, но мы так и не завершили работу. Даже тогда, когда придет время выполнить действие, вы примете удар.

Существует четыре практических подхода к ускорению обнаружения исходного файла:

  1. Используйте большой кластер, поскольку некоторые аспекты обнаружения файлов распространяются. В некоторых средах вы можете уменьшить кластер после завершения обнаружения.
  2. Сделайте начальное обнаружение до , вам необходимо использовать данные, чтобы, надеюсь, они были доступны к тому времени, когда они вам понадобятся. У нас есть петабайты данных в миллионах больших файлов Parquet с тремя уровнями разбиения. Мы используем запланированное задание для обновления индекса файла в памяти.
  3. Если вы используете Databricks, используйте Delta's OPTIMIZE, чтобы объединить маленькие файлы Parquet в меньшее, большее. Обратите внимание, что Delta стоит дополнительно.
  4. Реализуйте эквивалент OPTIMIZE самостоятельно, переписав подмножества данных. Сможете ли вы сделать это легко или нет, зависит от моделей доступа: вам нужно подумать о идемпотентности и последовательности.

Как только начальное обнаружение выполнено, кэширование списка файлов в памяти - ваш лучший друг. Есть два способа сделать это:

  • Используйте metastore, регистрируя ваши данные как внешнюю таблицу. Можно ли сделать это легко или нет, зависит от шаблонов обновления данных. Если данные разделены естественным образом, вы можете добавить разделы, используя DDL, и вы можете легко реализовать вышеприведенную стратегию (4).

  • Создайте свой собственный менеджер таблиц. Это то, что мы сделали, поскольку реализация метастазов имела недопустимые ограничения на развитие схемы. Вам нужно будет выбрать область действия: driver / JVM и SparkSession - два очевидных варианта.

Удачи!

...