В настоящее время я создаю агрегатор необработанных данных журнала, используя Spark Structured Streaming.
Inputstream создается с каталогом текстовых файлов:
// == Input == //
val logsDF = spark.readStream
.format("text")
.option("maxFilesPerTrigger", 1)
.load("input/*")
Затем логи анализируются ...
// == Parsing == //
val logsDF2 = ...
... и агрегировано
// == Aggregation == //
val windowedCounts = logsDF2
.withWatermark("window_start", "15 minutes")
.groupBy(
col("window"),
col("node")
).count()
Все работает нормально, когда я использую "консольную" раковину: результаты обновляются пакетно по ванне в консоли:
// == Output == //
val query = windowedCounts.writeStream
.format("console")
.outputMode("complete")
.start()
.awaitTermination()
Теперь я хочу сохранить свои результаты в одном уникальном файле (json, parquet, csv ..)
// == Output == //
val query = windowedCounts.writeStream
.format("csv")
.option("checkpointLocation", "checkpoint/")
.start("output/")
.awaitTermination()
Но он выводит мне 400 пустых CSV ... Как я могу получить свои результаты, как я сделал в консоли?
Большое спасибо!