Обнаружение и чтение нескольких файлов в Spark - PullRequest
3 голосов
/ 17 марта 2019

У вас есть разные системы, которые имеют разный набор файлов (txt, csv) для загрузки, преобразования и записи в файлы с использованием Apache Spark / Scala.Скажем, у SystemA есть 3 файла, а у SystemB есть 2 файла в соответствующих каталогах.

FileType       |FileNames
-----------------------------------------
Customer       |Customer_20190301.csv
Account        |Account_20190301.csv
Order          |Order_20190301.csv
OrderDetails   |OrderDetails_20190301.txt
Transactions   |Transactions_20190301.txt

Теперь я хотел бы получить имена файлов и пути на основе имени системы, указанного в качестве входных данных, чтобы я мог загрузить ихсоответствующие системные файлы.Я не хочу создавать отдельные программы для каждой системы и загружать их файлы, поскольку имена файлов или пути могут измениться в будущем.

Есть ли эффективный способ справиться с этим?Используете файлы конфигурации?Или может использовать или не использовать какие-либо внешние библиотеки?Пожалуйста, ведите меня.

1 Ответ

1 голос
/ 18 марта 2019

Эта проблема является хорошим кандидатом для подхода «разделяй и властвуй»:

  1. Опишите количество систем + любые параметры, необходимые для параметризации дальнейшей обработки. Как это сделать, зависит от среды развертывания, выбранного языка и т. Д. Единого правильного ответа не существует.

  2. Считать информацию из (1) в структуру данных.

  3. Создание списка файлов для обработки с использованием некоторой комбинации (2) и, возможно, (рекурсивного) списка каталогов. Обратите внимание, что при заданном пути вы можете получить файловую систему Hadoop в Spark, используя FileSystem.get(new java.net.URI(path), new Configuration()).

  4. Группировка файлов по типу.

  5. Для каждой группы параметризовать DataFrameReader из spark.read соответствующим образом и вызвать версию загрузки для многих путей, используя .load(paths: _*). Вы можете обобщить этот код, создав карту имени группы для функции, которая возвращает DataFrameReader.

Вот пример того, как это сделать (5):

val readers: Map[String, SparkSession => DataFrameReader] = Map(
  "customer" -> ((spark: SparkSession) => spark.read.option("format", "csv"))
)

val groups: Map[String, Seq[String]] = ???

groups.map { case (groupName, paths) =>
  readers(groupName)(spark).load(paths: _*)
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...