Полагаю, вы могли бы
KStream[] streams = stream.leftJoin(table,...).branch(...);
stream[1].transform(...).to("input-topic");
. Вы используете ветвь для помещения объединенных записей в первый поток и несвязанных записей во второй поток.Второй поток передается в transform()
, который использует хранилище состояний для буферизации этих записей, и вы можете context.forward()
отправлять их, используя punctuations
с 5-секундной задержкой.