Если у меня есть записи ниже
{"A",7}
{"B",10}
{"C",10}
Тогда агрегация должна быть
{"sum_ABC",27}
Затем, когда A изменяется на 10, т.е. добавляется новое сообщение
{"A",10}
Теперь он должен вычисляться как
{"sum_ABC",30}
val record: KTable[String, JsonNode] = builder.table("logs",m_consumed)
val aggVal: KTable[String, Double] = record.toStream().groupByKey()
.reduce(new Reducer[Double]() {
def apply(val1: Double, val2: Double): Double =
{
println(val1)
val1 + val2
}
})
Это не работает. Он продолжает добавлять значения, а при перезапуске напрямую добавляет 0 + новое значение.До сих пор я понял, что мне нужно использовать агрегат, а не сокращение.Пожалуйста, руководство, как это сделать.Любые ссылки или учебник?