Kafka Streams - GroupBy - Позднее событие - persistentWindowStore - WindowBy с периодом отсрочки и подавлением - PullRequest
0 голосов
/ 08 мая 2020

Моя цель - подсчитывать сообщения об успехах и неудачах от источника к месту назначения в секунду и суммировать их результаты в ежедневной базе.

У меня было два варианта сделать это;

  1. события потока, затем сгруппировать их время # источник # назначение
KeyValueBytesStoreSupplier streamStore = Stores.persistentKeyValueStore("store-name");  
sourceStream.selectKey((k, v) -> v.getDataTime() + KEY_SEPERATOR + SRC + KEY_SEPERATOR + DEST ).groupByKey().aggregate(
DO SOME Aggregation,
    Materialized.<String, AggregationObject>as(streamStore)
                .withKeySerde(Serdes.String())
                .withValueSerde(AggregationObjectSerdes));

После попытки описанного выше подхода мы заметили, что хранилище состояний увеличивается из-за увеличения количества уникальных ключей, и, если я прав, из-за того, что темы состояний только "компактные", они никогда не истекают.

NumberOfUniqueKeys = 86,400 секунд в день X ИСТОЧНИК X НАЗНАЧЕНИЕ

Затем мы подумали, что если мы не поместим поле времени в блок KEY, мы сможем уменьшить размер хранилища состояний. Мы попробовали работу с окнами в качестве второго подхода.

с использованием оконных операций с persistentWindowStore, CustomTimeStampExtractor, WindowBy, Suppress

WindowBytesStoreSupplier streamStore = Stores.persistentWindowStore("store-name", Duration.ofHours(6), Duration.ofSeconds(1), false);

sourceStream.selectKey((k, v) -> SRC + KEY_SEPERATOR + DEST)
.groupByKey()  .windowedBy(TimeWindows.of(Duration.ofSeconds(1)).grace(Duration.ofSeconds(5)))
.aggregate(
    {
       DO SOME Aggregation
    }, Materialized.<String, AggregationObject>as(streamStore)
        .withKeySerde(Serdes.String())
        .withValueSerde(AggregationObjectSerdes))
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())).toStream();`

Попробовав второй подход, мы уменьшили размер хранилища состояний, но теперь у нас возникла проблема с поздними событиями. Затем мы добавили период отсрочки в 5 секунд с операцией подавления, и, кроме того, использование периода отсрочки и операции подавления не гарантировало обработку всех запоздалых событий, еще одним побочным эффектом операции подавления является задержка, поскольку она генерирует результат агрегирования после периода отсрочки окна.

BTW

использование оконной операции привело к появлению ПРЕДУПРЕЖДАЮЩЕГО сообщения типа «WARN 1 --- [-StreamThread-2] oaksstate.internals.WindowKeySchema: Warning: window end время было усечено до Long.MAX "

Я проверил причину в исходном коде и нашел отсюда https://github.com/a0x8o/kafka/blob/master/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java

/**
 * Safely construct a time window of the given size,
 * taking care of bounding endMs to Long.MAX_VALUE if necessary
 */
static TimeWindow timeWindowForSize(final long startMs,
                                    final long windowSize) {
    long endMs = startMs + windowSize;

    if (endMs < 0) {
        LOG.warn("Warning: window end time was truncated to Long.MAX");
        endMs = Long.MAX_VALUE;
    }
    return new TimeWindow(startMs, endMs);
}

НО на самом деле для меня не имеет никакого смысла, как endMs может быть ниже 0 ...

Вопросы?

  1. Что, если мы go закончим с подходом 1, как можем ли мы уменьшить размер государственного магазина? В подходе 1 было гарантировано, что все события будут обработаны, и не будет пропущенного события из-за задержки.
  2. Что, если мы go перейдем к подходу 2, как мне настроить мой лог c и поймать данные с опозданием и уменьшить задержку?
  3. Почему я получаю предупреждающее сообщение в подходе 2, хотя все поля времени в моей модели положительны?
  4. Какие еще варианты вы можете предложить тогда эти два подхода?

Мне нужна помощь специалиста :)

BR,

1 Ответ

0 голосов
/ 13 мая 2020

Согласно почтовой группе kafka о предупреждающем сообщении

ПРЕДУПРЕЖДЕНИЕ сообщение типа «WARN 1 --- [-StreamThread-2] oaksstate.internals.WindowKeySchema: Предупреждение: время окончания окна было усечено до Long.MAX "

Мне было написано:

Вы можете получить это сообщение" oaksstate.internals.WindowKeySchema: Предупреждение: время окончания окна было усечено до Long.MAX "", когда ваш TimeWindowDeserializer создается без a windowSize. Есть два конструктора для TimeWindowDeserializer, используете ли вы тот, который с WindowSize?

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java#L46 -L55

Он вызывает WindowKeySchema с Long.MAX_VALUE https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java#L84 -L90

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...