Я реализую потоковое потоковое приложение, которое обрабатывает файлы журналов веб-сервера из папки на диске или, возможно, S3.Spark Structured Streaming почти идеально подходит для использования, с одной складкой.Имена файлов в папке также содержат имя машины, например.как:
/ узел1 _20181101.json.gz
/ узел1 _20181102.json.gz
/ узел2 _20181101.json.gz
/ узел3 _20181102.json.gz
/ узел4 _20181102.json.gz
... и т. Д.
(упрощенная) версия источника выглядит примерно так (я бы превратил приведенный ниже в непрерывный поток с оконным управлением и т. Д.):
val inputDF = spark.read
.option("codec", classOf[GzipCodec].getName)
.option("maxFilesPerTrigger", 1.toString)
.json(config.directory)
.transform { ds =>
logger.info(ds.inputFiles)
ds
}.foreach(println(_))
Я хотел бы преобразовать пакет и добавить идентификатор узла из имени файла в каждую строку записи, - я не вижу никакого вида триггера onBatch , который я мог бы использовать для обогащения записисхема с идентификатором узла из имени файла.
Я посмотрел на следующее, и, похоже, ничего не подходит: [FileStreamSource] [https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-FileStreamSource.html#metadataLog]
К сожалению, получение дескриптора имени машины из имени файла является ключом к аналитике, которую я делаюпозже, и я не могу контролировать, как логи заполняются
Есть какие-нибудь подсказки?