Моя цель - подсчитывать сообщения об успехах и неудачах от источника к месту назначения в секунду и суммировать их результаты в ежедневной базе.
У меня было два варианта сделать это;
- события потока, затем сгруппировать их время # источник # назначение
KeyValueBytesStoreSupplier streamStore = Stores.persistentKeyValueStore("store-name");
sourceStream.selectKey((k, v) -> v.getDataTime() + KEY_SEPERATOR + SRC + KEY_SEPERATOR + DEST ).groupByKey().aggregate(
DO SOME Aggregation,
Materialized.<String, AggregationObject>as(streamStore)
.withKeySerde(Serdes.String())
.withValueSerde(AggregationObjectSerdes));
После попытки описанного выше подхода мы заметили, что хранилище состояний увеличивается из-за увеличения количества уникальных ключей, и, если я прав, из-за того, что темы состояний только "компактные", они никогда не истекают.
NumberOfUniqueKeys = 86,400 секунд в день X ИСТОЧНИК X НАЗНАЧЕНИЕ
Затем мы подумали, что если мы не поместим поле времени в блок KEY, мы сможем уменьшить размер хранилища состояний. Мы попробовали работу с окнами в качестве второго подхода.
с использованием оконных операций с persistentWindowStore, CustomTimeStampExtractor, WindowBy, Suppress
WindowBytesStoreSupplier streamStore = Stores.persistentWindowStore("store-name", Duration.ofHours(6), Duration.ofSeconds(1), false);
sourceStream.selectKey((k, v) -> SRC + KEY_SEPERATOR + DEST)
.groupByKey() .windowedBy(TimeWindows.of(Duration.ofSeconds(1)).grace(Duration.ofSeconds(5)))
.aggregate(
{
DO SOME Aggregation
}, Materialized.<String, AggregationObject>as(streamStore)
.withKeySerde(Serdes.String())
.withValueSerde(AggregationObjectSerdes))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())).toStream();`
Попробовав второй подход, мы уменьшили размер хранилища состояний, но теперь у нас возникла проблема с поздними событиями. Затем мы добавили период отсрочки в 5 секунд с операцией подавления, и, кроме того, использование периода отсрочки и операции подавления не гарантировало обработку всех запоздалых событий, еще одним побочным эффектом операции подавления является задержка, поскольку она генерирует результат агрегирования после периода отсрочки окна.
BTW
использование оконной операции привело к появлению ПРЕДУПРЕЖДАЮЩЕГО сообщения типа «WARN 1 --- [-StreamThread-2] oaksstate.internals.WindowKeySchema: Warning: window end время было усечено до Long.MAX "
Я проверил причину в исходном коде и нашел отсюда https://github.com/a0x8o/kafka/blob/master/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
/**
* Safely construct a time window of the given size,
* taking care of bounding endMs to Long.MAX_VALUE if necessary
*/
static TimeWindow timeWindowForSize(final long startMs,
final long windowSize) {
long endMs = startMs + windowSize;
if (endMs < 0) {
LOG.warn("Warning: window end time was truncated to Long.MAX");
endMs = Long.MAX_VALUE;
}
return new TimeWindow(startMs, endMs);
}
НО на самом деле для меня не имеет никакого смысла, как endMs может быть ниже 0 ...
Вопросы?
- Что, если мы go закончим с подходом 1, как можем ли мы уменьшить размер государственного магазина? В подходе 1 было гарантировано, что все события будут обработаны, и не будет пропущенного события из-за задержки.
- Что, если мы go перейдем к подходу 2, как мне настроить мой лог c и поймать данные с опозданием и уменьшить задержку?
- Почему я получаю предупреждающее сообщение в подходе 2, хотя все поля времени в моей модели положительны?
- Какие еще варианты вы можете предложить тогда эти два подхода?
Мне нужна помощь специалиста :)
BR,