Я написал поток, который принимает сообщения и отправляет таблицу появившихся ключей.Если что-то появится, будет показано число 1. Это упрощенная версия моего производственного кода, чтобы продемонстрировать ошибку.В режиме реального времени сообщение отправляется для каждого полученного сообщения.
Однако, когда я запускаю его в модульном тесте с использованием ProcessorTopologyTestDriver, я получаю другое поведение.Если ключ, который уже был просмотрен ранее, получен, я получаю дополнительное сообщение.
Если я отправляю сообщения с ключами «key1», затем «key2», затем «key1», я получаю следующий вывод.
key1 - 1
key2 - 1
key1 - 0
key1 - 1
По какой-то причине он уменьшает значение перед добавлением. Это происходит только при использовании ProcessorTopologyTestDriver.Это ожидается?Есть ли работа вокруг?Или это ошибка?
Вот моя топология:
final StreamsBuilder builder = new StreamsBuilder();
KGroupedTable<String, String> groupedTable
= builder.table(applicationConfig.sourceTopic(), Consumed.with(Serdes.String(), Serdes.String()))
.groupBy((key, value) -> KeyValue.pair(key, value), Serialized.with(Serdes.String(), Serdes.String()));
KTable<String, Long> countTable = groupedTable.count();
KStream<String, Long> countTableAsStream = countTable.toStream();
countTableAsStream.to(applicationConfig.outputTopic(), Produced.with(Serdes.String(), Serdes.Long()));
Вот мой код модульного теста:
TopologyWithGroupedTable top = new TopologyWithGroupedTable(appConfig, map);
Topology topology = top.get();
ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, topology);
driver.process(inputTopic, "key1", "theval", Serdes.String().serializer(), Serdes.String().serializer());
driver.process(inputTopic, "key2", "theval", Serdes.String().serializer(), Serdes.String().serializer());
driver.process(inputTopic, "key1", "theval", Serdes.String().serializer(), Serdes.String().serializer());
ProducerRecord<String, Long> outputRecord = driver.readOutput(outputTopic, keyDeserializer, valueDeserializer);
assertEquals("key1", outputRecord.key());
assertEquals(Long.valueOf(1L), outputRecord.value());
outputRecord = driver.readOutput(outputTopic, keyDeserializer, valueDeserializer);
assertEquals("key2", outputRecord.key());
assertEquals(Long.valueOf(1L), outputRecord.value());
outputRecord = driver.readOutput(outputTopic, keyDeserializer, valueDeserializer);
assertEquals("key1", outputRecord.key());
assertEquals(Long.valueOf(1L), outputRecord.value()); //this fails, I get 0. If I pull another message, it shows key1 with a count of 1
Вот репозиторий полного кода:
https://bitbucket.org/nsinha/testtopologywithgroupedtable/src/master/
Топология потока: https://bitbucket.org/nsinha/testtopologywithgroupedtable/src/master/src/main/java/com/nick/kstreams/TopologyWithGroupedTable.java
Код теста: https://bitbucket.org/nsinha/testtopologywithgroupedtable/src/master/src/test/java/com/nick/kstreams/TopologyWithGroupedTableTests.java