агрегат groupBy потоков kafka выдает неожиданные значения - PullRequest
2 голосов
/ 17 октября 2019

мой вопрос о потоках Kafka Ktable.groupBy.aggregate. и результирующие агрегированные значения.

ситуация

Я пытаюсь агрегировать минутные события в день.

У меня есть минутный генератор событий (не показанздесь), который генерирует события для нескольких домов. Иногда значение события неверно, и минутное событие должно быть переиздано. Минутные события публикуются в теме «минуты» .

Я делаю агрегацию этих событий в день и дом с использованием потоков КафкиgroupBy и aggregate.

проблема

Обычно, поскольку в дне 1440 минут, никогда не должно быть агрегирования с более чем 1440 значениями. Также никогда не должно быть агрегации с отрицательным количеством событий.

... Но это происходит в любом случае, и мы не понимаем, что не так в нашем коде.

пример кода

Вот пример упрощенного кода для иллюстрации проблемы. IllegalStateException иногда генерируется.


        StreamsBuilder builder = new StreamsBuilder();

        KTable<String, MinuteEvent> minuteEvents = builder.table(
                "minutes",
                Consumed.with(Serdes.String(), minuteEventSerdes),
                Materialized.<String, MinuteEvent, KeyValueStore<Bytes, byte[]>>with(Serdes.String(), minuteEventSerdes)
                        .withCachingDisabled());

        // preform daily aggregation
        KStream<String, MinuteAggregate> dayEvents = minuteEvents
                // group by house and day
                .filter((key, minuteEvent) -> minuteEvent != null && StringUtils.isNotBlank(minuteEvent.house))
                .groupBy((key, minuteEvent) -> KeyValue.pair(
                        minuteEvent.house + "##" + minuteEvent.instant.atZone(ZoneId.of("Europe/Paris")).truncatedTo(ChronoUnit.DAYS), minuteEvent),
                        Grouped.<String, MinuteEvent>as("minuteEventsPerHouse")
                                .withKeySerde(Serdes.String())
                                .withValueSerde(minuteEventSerdes))
                .aggregate(
                        MinuteAggregate::new,
                        (String key, MinuteEvent value, MinuteAggregate aggregate) -> aggregate.addLine(key, value),
                        (String key, MinuteEvent value, MinuteAggregate aggregate) -> aggregate.removeLine(key, value),
                        Materialized
                                .<String, MinuteAggregate, KeyValueStore<Bytes, byte[]>>as(BILLLINEMINUTEAGG_STORE)
                                .withKeySerde(Serdes.String())
                                .withValueSerde(minuteAggSerdes)
                                .withLoggingEnabled(new HashMap<>())) // keep this aggregate state forever
                .toStream();

        // check daily aggregation
        dayEvents.filter((key, value) -> {
            if (value.nbValues < 0) {
                throw new IllegalStateException("got an aggregate with a negative number of values " + value.nbValues);
            }
            if (value.nbValues > 1440) {
                throw new IllegalStateException("got an aggregate with too many values " + value.nbValues);
            }
            return true;
        }).to("days", minuteAggSerdes);

, и вот пример класса, используемый в этом фрагменте кода:

    public class MinuteEvent {
        public final String house;
        public final double sensorValue;
        public final Instant instant;

        public MinuteEvent(String house,double sensorValue, Instant instant) {
            this.house = house;
            this.sensorValue = sensorValue;
            this.instant = instant;
        }
    }

    public class MinuteAggregate {
        public int nbValues = 0;
        public double totalSensorValue = 0.;
        public String house = "";

        public MinuteAggregate addLine(String key, MinuteEvent value) {
            this.nbValues = this.nbValues + 1;
            this.totalSensorValue = this.totalSensorValue + value.sensorValue;
            this.house = value.house;
            return this;
        }

        public MinuteAggregate removeLine(String key, MinuteEvent value) {
            this.nbValues = this.nbValues -1;
            this.totalSensorValue = this.totalSensorValue - value.sensorValue;
            return this;
        }

        public MinuteAggregate() {
        }
    }

Если кто-то может сказать нам, что мы делаем здесь неправильно и почему мыиметь эти неожиданные значения, которые были бы хорошими.

дополнительные примечания

  • мы настраиваем нашу потоковую работу для запуска с 4 потоками properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
  • , которые мы вынуждены использоватьKtable.groupBy().aggregate(), поскольку значения минут могут быть переизданы с разными sensorValue для уже опубликованного Мгновенного. И ежедневная агрегация изменена соответственно. Stream.groupBy().aggregate() не имеет adder И substractor.

1 Ответ

0 голосов
/ 20 октября 2019

Я думаю, на самом деле вполне возможно, что счет станет временным отрицательным.

Причина в том, что каждое обновление в вашем первом KTable отправляет два сообщения вниз по течению - старыйзначение, которое будет вычтено в агрегации в нисходящем направлении, и новое значение, которое будет добавлено в агрегации в нисходящем направлении. Оба сообщения будут обрабатываться независимо при агрегации в нисходящем направлении.

Если текущий счет равен нулю, а вычитания обрабатываются перед добавлением, счет временно становится отрицательным.

...