Реактор проекта: доступ к контексту после завершения - PullRequest
0 голосов
/ 27 июня 2019

У меня есть реактивный (Mono) поток, который обрабатывает входящие данные в несколько этапов.Некоторые из этих шагов собирают данные регистрации в объекте Info.Экземпляр этого объекта передается на эти этапы обработки с помощью функции Reactor subscriberContext() (см. Project Reactor Docs ).В конце всей обработки объект Info должен быть сохранен независимо от успеха или ошибки потока.

У меня есть пример реализации для этого:

import java.time.Instant;
import reactor.core.publisher.Mono;

public class CtxTest {

    static class Info {
        Instant timeStamp = Instant.now();
        String input;
        @Override
        public String toString() {
            return timeStamp.toString() + ": " + input;
        }
    }

    public static void main(String... args) {
        final String contextKey = "key";

        Mono<String> mono = Mono.just("hello")
                .flatMap(s -> Mono.subscriberContext()
                        .map(ctx -> transform(s, ctx.get(contextKey))))
                .doAfterTerminate(() -> saveInfo(Mono.subscriberContext().block().get(contextKey))) // does not work
                .subscriberContext(ctx -> ctx.put(contextKey, new Info()));

        mono.subscribe(System.out::println);
    }

    static String transform(String input, Info info) {
        info.input = input;
        return input.toUpperCase();
    }

    static void saveInfo(Info info) {
        System.out.println(info);
    }
}

Здесь, transform - это этап обработки, который преобразует входную строку и дополнительно заполняет объект Info данными регистрации.Я убедился, что шаг .flatMap работает должным образом.

Однако шаг .doAfterTerminate дает исключение java.util.NoSuchElementException: Context is empty.Как я могу заархивировать желаемое поведение?

Примечание: я нашел несколько примеров, которые используют .doOnEach для доступа к контексту.Если я заменим строку .doAfterTerminate на эту строку:

.doOnEach(s -> saveInfo(s.getContext().get(contextKey)))

, я успешно вызову saveInfo с непустым объектом контекста.Интересно, что один прогон main вызывает функцию дважды.

...