Ускорьте работу InMemoryFileIndex для Spark SQL с большим количеством входных файлов - PullRequest
0 голосов
/ 02 ноября 2018

У меня есть задание apache spark sql (с использованием наборов данных), написанное на Java, которое получает от 70 000 до 150 000 файлов.

Создание InMemoryFileIndex занимает от 45 минут до 1,5 часов.

За это время нет журналов, очень низкое использование сети и почти нет загрузки ЦП.

Вот пример того, что я вижу в выводе std:

24698 [main] INFO org.spark_project.jetty.server.handler.ContextHandler  - Started o.s.j.s.ServletContextHandler@32ec9c90{/static/sql,null,AVAILABLE,@Spark}
25467 [main] INFO org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef  - Registered StateStoreCoordinator endpoint
2922000 [main] INFO org.apache.spark.sql.execution.datasources.InMemoryFileIndex  - Listing leaf files and directories in parallel under: <a LOT of file url's...>
2922435 [main] INFO org.apache.spark.SparkContext  - Starting job: textFile at SomeClass.java:103

В этом случае было 45 минут, по сути, ничего не происходило (насколько я могу судить).

Я загружаю файлы, используя:

sparkSession.read().textFile(pathsArray)

Может кто-нибудь объяснить, что происходит в InMemoryFileIndex, и как я могу сделать этот шаг быстрее?

1 Ответ

0 голосов
/ 02 ноября 2018

InMemoryFileIndex отвечает за обнаружение разделов (и, следовательно, удаление разделов), он выполняет распечатку файлов и может выполнять параллельное задание, которое может занять некоторое время, если у вас много файлов, так как он должен индексировать каждый файл. При этом Spark собирает некоторую базовую информацию о файлах (например, их размер), чтобы вычислить некоторые основные статистические данные, которые используются при планировании запросов. Если вы хотите избежать этого каждый раз, когда читаете данные, вы можете сохранить данные в виде таблицы источника данных (она поддерживается в Spark 2.1), используя metastore и команду saveAsTable (), и это обнаружение раздела будет выполнено только один раз и информация будет храниться в метастазе. Затем вы можете прочитать данные, используя metastore

sparkSession.read.table(table_name)

и это должно быть быстро, так как этот этап обнаружения раздела будет пропущен. Я рекомендую посмотреть этот доклад Spark Summit, в котором обсуждается эта проблема.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...