Если я правильно понимаю ваш вопрос: всякий раз, когда запускается внешний вызов REST, вы хотите запустить отдельное потоковое приложение для чтения из карантинной темы B, пытаясь десериализовать данные в каком-то обновленном формате и, если это удастся, нажать натема «хорошие данные» C, и это потоковое приложение должно автоматически останавливаться, когда оно достигает конца темы B.
В этом случае, и, если у вас нет требований к порядку оформления в последней теме C, вывнутренне может использовать «флаг остановки», который поток вызывающих абонентов KafkaStreams может блокировать и ожидать, в то время как внутренний поток потока KafkaStreams может установить разблокировку потоков вызывающих абонентов, чтобы в конечном итоге вызвать «KafkaStreams.close ()».Например, вы можете использовать функцию пунктуации, которая проверяет, нет ли новых данных после последнего периода пунктуации, указывая, что мы, вероятно, исчерпали все данные из темы B, и в этом случае установите флаг.
Пример можно найти в собственном коде бенчмаркинга Streams: https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java#L657-L673, но обратите внимание, что он основан не на пунктуации, а на логике обработки, проверяя текущее обработанное содержимое данных, поскольку он точно знает, как будет выглядеть «последняя запись»,Но общая идея использования такой отключающей защелки та же.