Мне нужно пакетировать мой поток Kafka во временные окна по 10 минут каждое, а затем запустить некоторую пакетную обработку для него.
Примечание: записи ниже имеют поле метки времени
val records = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerPool)
.option("subscribe", topic)
.option("startingOffsets", kafkaOffset)
.load()
Iдобавить временное окно к каждой записи, используя,
.withColumn("window", window($"timing", windowDuration))
Я создал несколько вспомогательных классов, таких как
case class TimingWindow(
start: java.sql.Timestamp,
end: java.sql.Timestamp
)
case class RecordWithWindow(
record: MyRecord,
groupingWindow: TimingWindow
)
Теперь у меня есть DF типа [RecordWithWindow]
Всеэто работает очень хорошо.
Далее,
metricsWithWindow
.groupByKey(_.groupingWindow)
//By grouping, I get several records per time window
//resulting an object of the below type which I write out to HDFS
case class WindowWithRecords(
records: Seq[MyRecord],
window: TimingWindow
)
Где я исследую HDFS,
Пример :
Ожидаемый : каждый объект WindowWithRecords имеет уникальный TimingWindow
WindowWithRecordsA(TimingWindowA, Seq(MyRecordA, MyRecordB, MyRecordC))
Actual : более одного объекта WindowWithRecords с одним и тем же TimingWindow
WindowWithRecordsA(TimingWindowA, Seq(MyRecordA, MyRecordB))
WindowWithRecordsB(TimingWindowA, Seq(MyRecordC))
выглядит какЛогика groupByKey не работает.
Надеюсь, мой вопрос понятен.Любые указатели будут полезны.