Прошу прощения за беспокойство. Это действительно непростая проблема. У меня есть несколько идей по добавлению некоторых операций для поддержки такого рода интеграционного тестирования, но это сложно обойтись без нарушения базовой c семантики времени обработки потока.
Похоже, вы тестируете «настоящий» KafkaStreams приложение, в отличие от тестирования с помощью TopologyTestDriver. Мое первое предложение состоит в том, что вам будет намного удобнее проверять семантику вашего приложения с помощью TopologyTestDriver, если он соответствует вашим потребностям.
Мне кажется, что у вас может быть более одного раздела во входной топологии c (и, следовательно, ваше приложение). Если ключ 1 переходит в один раздел, а ключ 3 - в другой, вы увидите то, что наблюдали. Каждый раздел вашего приложения независимо отслеживает время потока. TopologyTestDriver отлично работает, потому что использует только один раздел, а также потому, что обрабатывает данные синхронно. В противном случае вам придется создавать свои «фиктивные» сообщения о продвижении по времени до go в том же разделе, что и ключ, который вы пытаетесь вывести sh.
Это будет особенно сложно потому что ваш «flatMap (). groupByKey ()» будет перераспределять данные. Вам нужно будет создать фиктивное сообщение, чтобы оно попало в правильный раздел после повторного разделения. Или вы можете поэкспериментировать с написанием своих фиктивных сообщений прямо в топи повторного разбиения c.
Если вам действительно нужно протестировать с KafkaStreams вместо TopologyTestDriver, я думаю, что проще всего просто написать «временное продвижение» сообщение для каждого ключа, как вы предлагали в своем вопросе. Не потому, что это строго необходимо, а потому, что это самый простой способ учесть все эти предостережения. Я также упомяну, что мы работаем над некоторыми общими улучшениями в обработке времени потока в Kafka Streams, которые должны значительно упростить ситуацию, но, конечно, сейчас это вам не поможет.