Spark Структурированная потоковая передача writeStream для вывода одного глобального CSV - PullRequest
0 голосов
/ 11 сентября 2018

В настоящее время я создаю агрегатор необработанных данных журнала, используя Spark Structured Streaming.

Inputstream создается с каталогом текстовых файлов:

// == Input == //

val logsDF = spark.readStream
  .format("text")
  .option("maxFilesPerTrigger", 1)
  .load("input/*")

Затем логи анализируются ...

// == Parsing == //

val logsDF2 = ...

... и агрегировано

// == Aggregation == //

val windowedCounts = logsDF2
  .withWatermark("window_start", "15 minutes")
  .groupBy(
    col("window"),
    col("node")
  ).count()

Все работает нормально, когда я использую "консольную" раковину: результаты обновляются пакетно по ванне в консоли:

// == Output == //

val query = windowedCounts.writeStream
  .format("console")
  .outputMode("complete")
  .start()
  .awaitTermination()

Теперь я хочу сохранить свои результаты в одном уникальном файле (json, parquet, csv ..)

// == Output == //

val query = windowedCounts.writeStream
  .format("csv")
  .option("checkpointLocation", "checkpoint/")
  .start("output/")
  .awaitTermination()

Но он выводит мне 400 пустых CSV ... Как я могу получить свои результаты, как я сделал в консоли?

Большое спасибо!

...