Я прошу прощения, если это глупый вопрос.
У меня есть сценарий, согласно которому у меня есть 3 темы из вышестоящей службы (которые не имеют ключа). К сожалению, я не могу изменить поведение 3 тем.
Основная часть службы верхнего уровня публикует все сообщения в конце дня, и мне нужно получить общее представление о транзакциях, так как заказ транзакций имеет значение для нисходящего сервиса.
Я понимаю, что не могу переупорядочить сообщения в разных разделах тем, поэтому я подумал, смогу ли я их накапливать, и тогда мой сервис мог бы взять накопленный результат и переупорядочьте их перед обработкой.
Однако я замечаю странное поведение и надеюсь, что кто-то сможет уточнить, что мне не хватает.
Когда я выполняю операцию с 1 для 500 учетных записей я вижу 500 сообщений, собранных и отображенных в выходных данных topi c.
Однако, когда я пытаюсь выполнить ту же операцию с 10 000 учетными записями, я вижу больше выходных данных, чем должно быть. (13 000 сообщений на выходе topi c).
KStream<String, TransactionAccumulator> transactions =
disbursements
.merge(repayments)
.merge(fees)
.groupBy(
(k, v) -> v.getAccountId(),
with(
String(),
serdeFrom(
new JsonSerializer<>(mapper),
new JsonDeserializer<>(Transaction.class, mapper))))
.windowedBy(SessionWindows.with(Duration.of(1, ChronoUnit.MINUTES)))
.aggregate(
TransactionAccumulator::new,
(key, value, aggregate) -> aggregate.add(value),
(aggKey, aggOne, aggTwo) -> aggOne.merge(aggTwo),
Materialized.with(
String(),
serdeFrom(
new JsonSerializer<>(mapper),
new JsonDeserializer<>(TransactionAccumulator.class, mapper))))
.toStream((key, value) -> key.key());
Как указывалось ранее, основная часть вышестоящей службы публикует все события в конце дня (вместо реального времени).
Буду признателен за то, что мне здесь не хватает, так как для небольших томов, похоже, работает.
Обновление 1
Я попробовал предложенное использование suppression чтобы попытаться отправить только окончательное окно.
Однако, при использовании этого, он в основном не публикует sh никаких сообщений на выходе topi c, хотя я вижу, что в "KTABLE-SUPPRESS-STATE-STORE" * есть сообщения
Обновленный код с подавлением выглядит следующим образом.
disbursements
.merge(repayments)
.merge(fees)
.groupBy(
(key, value) -> value.getAccountId(),
Grouped.with(
Serdes.String(),
Serdes.serdeFrom(
new JsonSerializer<>(mapper),
new JsonDeserializer<>(Transaction.class, mapper))))
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMinutes(1)))
.aggregate(
TransactionAccumulator::new,
(key, value, aggregate) -> aggregate.add(value),
Materialized.with(
Serdes.String(),
Serdes.serdeFrom(
new JsonSerializer<>(mapper),
new JsonDeserializer<>(TransactionAccumulator.class, mapper))))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.mapValues(
value -> {
LOGGER.info(
"Sending {} Transactions for {}",
value.getTransactions().size(),
value.getAccountId());
return value;
})
.toStream((key, value) -> key.key());
Я также не вижу введенных сообщений журнала. Для ясности я использую Spring Cloud Stream в этом эксперименте, и последние записи журнала, которые я вижу в потоковом приложении, выглядят следующим образом.
INFO 23436 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [StreamConsumer-consume-applicationId-de25a238-5f0f-4d84-9bd2-3e7b01b7f0b3] State transition from REBALANCING to RUNNING
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode