Я пытаюсь создать поток «Разница» из KStream в Kafka Java.
У меня есть входной поток, где значения представляют собой набор значений Double V0… Vn. Выходной поток должен вычислять разницу между V0 - 0, V1 - V0, V2 - V1… Vn –Vn-1.
Моей первой идеей было сделать что-то вроде этого:
KStream<String, Double> stream = builder.stream(TOPIC)
KTable<String, Double> difference = stream.groupByKey().reduce(
(oldValue, newValue) -> {
return newValue - oldValue
}
).toStream()
Допустим, у меня есть вход KStream со следующими значениями:
Key -> Value
"A1" -> 2
"B2" -> 4
"A1" -> 6
"A1" -> 10
"B2" -> 13
"A1" -> 7
Я хотел бы создать новый поток вывода со следующими значениями:
Key -> Value
"A1" -> 2 (2-0 = 2)
"B2" -> 4 (4-0 = 4)
"A1" -> 4 (6-2 = 4)
"A1" -> 4 (10-6 = 4)
"B2" -> 9 (13-4 = 9)
"A1" -> -3 (7-10 = -3)