У меня есть случай, когда я использую kafka Reactor с webflux, и я использую записи из исправления Kafka kafkaReceiver
в виде потока, а затем я применяю некоторую функцию карты, чтобы получить желаемую форму.Затем я отфильтровываю на основе результатов этого.Тем не менее, когда я добавляю фильтр / фильтр, когда я не могу остановить остальные API / прекратить HTTP-вызов и остановить тягу от Кафки.
Пример: вызов localhost / someTopic
Затем сервисный уровень устанавливает kafkareciever
с someTopic
.
Flux<?> returnFlux = kafkaSub.map(someFuntion).filter(x -> x != null).subscribe();
Когда я отменяю localhost / someTopic, я по-прежнемусмотри kafkaSub работает.как сигнал завершения не доходит до моего источника.
Все, что я хочу сделать, это отфильтровать определенные данные.