Поток потока реактора не останавливается - PullRequest
0 голосов
/ 07 февраля 2019

У меня есть случай, когда я использую kafka Reactor с webflux, и я использую записи из исправления Kafka kafkaReceiver в виде потока, а затем я применяю некоторую функцию карты, чтобы получить желаемую форму.Затем я отфильтровываю на основе результатов этого.Тем не менее, когда я добавляю фильтр / фильтр, когда я не могу остановить остальные API / прекратить HTTP-вызов и остановить тягу от Кафки.

Пример: вызов localhost / someTopic

Затем сервисный уровень устанавливает kafkareciever с someTopic.

Flux<?> returnFlux = kafkaSub.map(someFuntion).filter(x -> x != null).subscribe();

Когда я отменяю localhost / someTopic, я по-прежнемусмотри kafkaSub работает.как сигнал завершения не доходит до моего источника.

Все, что я хочу сделать, это отфильтровать определенные данные.

...