Когда фильтр добавляется в поток после сбоя предиката, поток удаляется.
Когда фильтр не применяется, он идет до конца
Flux<String> f = Flux.<String>create(emitter -> {
FluxSink<String> e = emitter;
Thread t = new Thread(()-> {
for(int i =0 ; i< 10; i++)
{
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {}
e.next(String.valueOf(i));
}
});
emitter.onRequest(v -> t.start());
emitter.onDispose(() -> t.interrupt());
});
f.filter(s -> Integer.valueOf(s) % 3 == 0).subscribe(System.out::println);
То, что я ожидал, это напечатать 0, 3, 6, 9. Но он печатает только 0, и когда предикат фильтра терпит неудачу на '1', он удаляет поток.
Какую ошибку я совершил? Есть ли другой способ получить мое ожидаемое поведение