Spark Структурированная потоковая передача из файлов на S3 / Disk - добавить пакетное имя файла в записи / строки? - PullRequest
0 голосов
/ 26 ноября 2018

Я реализую потоковое потоковое приложение, которое обрабатывает файлы журналов веб-сервера из папки на диске или, возможно, S3.Spark Structured Streaming почти идеально подходит для использования, с одной складкой.Имена файлов в папке также содержат имя машины, например.как:

/ узел1 _20181101.json.gz

/ узел1 _20181102.json.gz

/ узел2 _20181101.json.gz

/ узел3 _20181102.json.gz

/ узел4 _20181102.json.gz

... и т. Д.

(упрощенная) версия источника выглядит примерно так (я бы превратил приведенный ниже в непрерывный поток с оконным управлением и т. Д.):

val inputDF = spark.read
  .option("codec", classOf[GzipCodec].getName)
  .option("maxFilesPerTrigger", 1.toString)
  .json(config.directory)
  .transform { ds =>
      logger.info(ds.inputFiles)
      ds
}.foreach(println(_))

Я хотел бы преобразовать пакет и добавить идентификатор узла из имени файла в каждую строку записи, - я не вижу никакого вида триггера onBatch , который я мог бы использовать для обогащения записисхема с идентификатором узла из имени файла.

Я посмотрел на следующее, и, похоже, ничего не подходит: [FileStreamSource] [https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-FileStreamSource.html#metadataLog]

К сожалению, получение дескриптора имени машины из имени файла является ключом к аналитике, которую я делаюпозже, и я не могу контролировать, как логи заполняются

Есть какие-нибудь подсказки?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...