У меня есть вариант использования для создания ежечасного снимка потребляемых данных из Kafka topi c. Я использую структурированную потоковую передачу искр, чтобы получать данные из Kafka и печатать их на консоли, следуя стандартной документации. Я новичок в структурированной потоковой передаче и не знаю, как приступить к созданию ежечасных снимков. Может ли кто-нибудь помочь мне в этом?
Изменить: текущий статус
Предположим, что структура события, исходящего от кафки,
id || имя || position
, где id однозначно определяет запись. У меня может быть несколько событий для id, и я хочу рассмотреть последнее событие.
Предположим, я запустил Spark pipeline в 11:00. Я хочу объединить все события, которые происходят с 11:00 до 12:00 на основе метаданных Kafka, и создать снимок, который можно выгрузить в файл.
id || имя || pos
2 || ron || 3.76
1 || can || 2.68
4 || barn || 4.6
Точно так же я хочу агрегировать данные за каждый час и генерировать консолидированное представление за текущий час и выгружать его в файл.
Я только что написал basi c потребитель, который выводит все события на консоль
val df = spark.readStream.format("kafka")
.option("localhost:9092").option("subscribe", "geo-location").option("failOnDataLoss", true)
.load.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING)
val query = df.writeStream
.format("console")
.option("truncate", "false")
Пожалуйста, дайте мне знать, если требуется дополнительная информация. Спасибо!