Я хочу записать все агрегированные данные из темы Кафки в файл паркета одновременно (или, по крайней мере, в итоге получить один файл паркета в конце).
Я запускаю отдельное приложение производителя, которое помещает 50 сообщений в тему.Данные агрегируются по времени (1 день) в потребительском приложении, поэтому мне нужно собрать все данные за 1 день и посчитать их.Это работает и выполняется следующим образом:
Dataset<Row> df = spark.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVER)
.option("subscribe", "test")
.option("startingOffsets", "latest")
.option("group.id", "test")
.option("failOnDataLoss", false)
.option("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer")
.option("value.deserializer", "org.apache.kafka.common.serialization.StringSerializer")
.load()
// LEFT OUT CODE FOR READABILITY
.withWatermark("timestamp", "1 minutes")
.groupBy(
functions.window(new Column("timestamp"), "1 day", "1 day"),
new Column("container_nummer"))
.count();
Результаты затем записываются в файл партера следующим образом:
StreamingQuery query = df.writeStream()
.format("parquet")
.option("truncate", "false")
.option("checkpointLocation", "/tmp/kafka-logs")
.start("/Users/**/kafka-path");
query.awaitTermination();
Если я записываю это в консоль, я получаюправильные подсчеты для каждого дня в партии 1. При попытке записать его в паркет, я получаю только несколько пустых файлов паркета.Я читаю их так:
SparkSession spark = SparkSession
.builder()
.appName("test")
.config("spark.master", "local")
.config("spark.sql.session.timeZone", "UTC")
.getOrCreate();
Dataset<Row> df = spark.read()
.parquet("/Users/**/kafka-path/part-00000-dd416263-8db1-4166-b243-caba470adac7-c000.snappy.parquet");
df.explain();
df.show(20);
все файлы паркета кажутся пустыми (в отличие от записи их в консоль), код выше выводит это:
+------+----------------+-----+
|window|container_nummer|count|
+------+----------------+-----+
+------+----------------+-----+
IУ меня есть два вопроса:
- По каким / возможным причинам мои файлы паркета пусты?
- Можно ли в конце иметь 1 полный файл паркета со всеми данными в нем?Я хочу использовать эти данные для подачи модели машинного обучения в другую программу.
Примечание. Не требуется запускать в производство.Я просто надеюсь, что кто-то знает, как это работает ..
Заранее спасибо!