У меня есть реактивный (Mono
) поток, который обрабатывает входящие данные в несколько этапов.Некоторые из этих шагов собирают данные регистрации в объекте Info
.Экземпляр этого объекта передается на эти этапы обработки с помощью функции Reactor subscriberContext()
(см. Project Reactor Docs ).В конце всей обработки объект Info должен быть сохранен независимо от успеха или ошибки потока.
У меня есть пример реализации для этого:
import java.time.Instant;
import reactor.core.publisher.Mono;
public class CtxTest {
static class Info {
Instant timeStamp = Instant.now();
String input;
@Override
public String toString() {
return timeStamp.toString() + ": " + input;
}
}
public static void main(String... args) {
final String contextKey = "key";
Mono<String> mono = Mono.just("hello")
.flatMap(s -> Mono.subscriberContext()
.map(ctx -> transform(s, ctx.get(contextKey))))
.doAfterTerminate(() -> saveInfo(Mono.subscriberContext().block().get(contextKey))) // does not work
.subscriberContext(ctx -> ctx.put(contextKey, new Info()));
mono.subscribe(System.out::println);
}
static String transform(String input, Info info) {
info.input = input;
return input.toUpperCase();
}
static void saveInfo(Info info) {
System.out.println(info);
}
}
Здесь, transform
- это этап обработки, который преобразует входную строку и дополнительно заполняет объект Info
данными регистрации.Я убедился, что шаг .flatMap
работает должным образом.
Однако шаг .doAfterTerminate
дает исключение java.util.NoSuchElementException: Context is empty
.Как я могу заархивировать желаемое поведение?
Примечание: я нашел несколько примеров, которые используют .doOnEach
для доступа к контексту.Если я заменим строку .doAfterTerminate
на эту строку:
.doOnEach(s -> saveInfo(s.getContext().get(contextKey)))
, я успешно вызову saveInfo
с непустым объектом контекста.Интересно, что один прогон main
вызывает функцию дважды.