У меня возникли некоторые проблемы с кодом, написанным с использованием проектного реактора:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.12.RELEASE</version>
</dependency>
Пожалуйста, рассмотрите следующий код:
class Scratch {
public static void main(String[] args) {
ArrayBlockingQueue<Long> q = new ArrayBlockingQueue<>(10);
startProducer(q);
Flux.<Long> create(sink -> consumeItemsFromQueue(q, sink))
.doOnNext(ctx -> System.out.println("Processing " + ctx))
.flatMap(ctx -> Flux.push((sink)->{ throw new IllegalArgumentException("bum!");}))
.onErrorContinue((ex, obj)->{
System.err.println("Caught error "+ex.getMessage() +" in obj:" +obj);
})
.doOnNext(element -> System.out.println("Do On NExt: " + element))
.subscribe();
}
private static void consumeItemsFromQueue(ArrayBlockingQueue<Long> q, FluxSink<Long> sink) {
while (true) {
try {
sink.next(q.take());
} catch (Throwable t) {
System.err.println("Error in catch");
}
}
}
private static void startProducer(ArrayBlockingQueue<Long> q) {
Thread thread = new Thread(() -> {
while (true) {
try {
q.put(System.currentTimeMillis());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
}
}
Этот код выдает следующий вывод:
Processing 1580494319870
Caught error bum! in obj:null
Processing 1580494321871
Caught error bum! in obj:null
Согласно документации в onErrorContinue
объектом должно быть значение, вызвавшее ошибку. Поэтому я ожидаю, что это будет объект ctx
из flatMap
. Вместо этого он нулевой.
Это ошибка или мое понимание документации неверно?