мой вопрос о потоках Kafka Ktable.groupBy.aggregate. и результирующие агрегированные значения.
ситуация
Я пытаюсь агрегировать минутные события в день.
У меня есть минутный генератор событий (не показанздесь), который генерирует события для нескольких домов. Иногда значение события неверно, и минутное событие должно быть переиздано. Минутные события публикуются в теме «минуты» .
Я делаю агрегацию этих событий в день и дом с использованием потоков КафкиgroupBy
и aggregate
.
проблема
Обычно, поскольку в дне 1440 минут, никогда не должно быть агрегирования с более чем 1440 значениями. Также никогда не должно быть агрегации с отрицательным количеством событий.
... Но это происходит в любом случае, и мы не понимаем, что не так в нашем коде.
пример кода
Вот пример упрощенного кода для иллюстрации проблемы. IllegalStateException иногда генерируется.
StreamsBuilder builder = new StreamsBuilder();
KTable<String, MinuteEvent> minuteEvents = builder.table(
"minutes",
Consumed.with(Serdes.String(), minuteEventSerdes),
Materialized.<String, MinuteEvent, KeyValueStore<Bytes, byte[]>>with(Serdes.String(), minuteEventSerdes)
.withCachingDisabled());
// preform daily aggregation
KStream<String, MinuteAggregate> dayEvents = minuteEvents
// group by house and day
.filter((key, minuteEvent) -> minuteEvent != null && StringUtils.isNotBlank(minuteEvent.house))
.groupBy((key, minuteEvent) -> KeyValue.pair(
minuteEvent.house + "##" + minuteEvent.instant.atZone(ZoneId.of("Europe/Paris")).truncatedTo(ChronoUnit.DAYS), minuteEvent),
Grouped.<String, MinuteEvent>as("minuteEventsPerHouse")
.withKeySerde(Serdes.String())
.withValueSerde(minuteEventSerdes))
.aggregate(
MinuteAggregate::new,
(String key, MinuteEvent value, MinuteAggregate aggregate) -> aggregate.addLine(key, value),
(String key, MinuteEvent value, MinuteAggregate aggregate) -> aggregate.removeLine(key, value),
Materialized
.<String, MinuteAggregate, KeyValueStore<Bytes, byte[]>>as(BILLLINEMINUTEAGG_STORE)
.withKeySerde(Serdes.String())
.withValueSerde(minuteAggSerdes)
.withLoggingEnabled(new HashMap<>())) // keep this aggregate state forever
.toStream();
// check daily aggregation
dayEvents.filter((key, value) -> {
if (value.nbValues < 0) {
throw new IllegalStateException("got an aggregate with a negative number of values " + value.nbValues);
}
if (value.nbValues > 1440) {
throw new IllegalStateException("got an aggregate with too many values " + value.nbValues);
}
return true;
}).to("days", minuteAggSerdes);
, и вот пример класса, используемый в этом фрагменте кода:
public class MinuteEvent {
public final String house;
public final double sensorValue;
public final Instant instant;
public MinuteEvent(String house,double sensorValue, Instant instant) {
this.house = house;
this.sensorValue = sensorValue;
this.instant = instant;
}
}
public class MinuteAggregate {
public int nbValues = 0;
public double totalSensorValue = 0.;
public String house = "";
public MinuteAggregate addLine(String key, MinuteEvent value) {
this.nbValues = this.nbValues + 1;
this.totalSensorValue = this.totalSensorValue + value.sensorValue;
this.house = value.house;
return this;
}
public MinuteAggregate removeLine(String key, MinuteEvent value) {
this.nbValues = this.nbValues -1;
this.totalSensorValue = this.totalSensorValue - value.sensorValue;
return this;
}
public MinuteAggregate() {
}
}
Если кто-то может сказать нам, что мы делаем здесь неправильно и почему мыиметь эти неожиданные значения, которые были бы хорошими.
дополнительные примечания
- мы настраиваем нашу потоковую работу для запуска с 4 потоками
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
- , которые мы вынуждены использовать
Ktable.groupBy().aggregate()
, поскольку значения минут могут быть переизданы с разными sensorValue
для уже опубликованного Мгновенного. И ежедневная агрегация изменена соответственно. Stream.groupBy().aggregate()
не имеет adder
И substractor
.