Spring Reactor: контекст для каждого элемента потока - PullRequest
0 голосов
/ 09 января 2020

У меня есть потоковый источник данных (как у Кафки). И я хотел бы применить реактор к этому приложению обработки событий.

В настоящее время я создал бесконечную последовательность событий, используя EmitterProcessor. И он подписывается в первый раз один раз и никогда не отменяется.

Следующий код указывает на то, что я сделал.

public void initialize(){
    EmitterProcessor<Event> emitter = ...
    emitter.flatmap(this::step1)
           .flatmap(this::step2)
           .flatmap(this::finalStep)
         //.subscriberContext(...)
           .subscribe()
}

Для каждого события в начальном Flux<Event> мне нужно поддерживать / обновлять контекст, чтобы я мог получить все входные данные и результаты для каждого шага и сделать некоторые отчеты на последнем шаге.

Передача неизменяемого класса Context от шага к шагу - вариант, но это приведет к у всех step() есть дополнительный параметр. И не все step() будут использовать Context. В этом случае кажется уродливым, вы просто передаете Context и возвращаете Pair<Context,OtherResult>. Pair также ужасен.

Поэтому я предпочитаю что-то вроде ThreadLocal<Context>. Очевидно, что в реакторе замена составляет subscriberContext(). Однако, согласно моему коду, initialize() будет вызван один раз. Flux<Event> будет subscribe() один раз. subscriberContext не на моем уровне Event, а на уровне подписки. Так что в моем коде будет только один контекст. Это не работает.

Вопрос в том, должен ли я считать поток событий Flux<Event> или несколько Mono<Event> и делать подписку на каждое событие? Если Mono<Event> - лучшая практика, тогда я могу напрямую использовать subscriberContext(). Но есть ли какие-либо временные затраты на ассамблею (ассамблею при каждом наступающем событии)?

В реактор-кафка он делает каждую партию Record a Flux<Record>, как он может что-то реализовать как контекст уровня записи?

Спасибо.

1 Ответ

0 голосов
/ 10 января 2020

В зависимости от того, насколько поздно вам в последний раз нужна информация в этом контексте, вы можете выбрать один flatMap, чтобы создать область для каждого события и назначить им собственный контекст:

public void initialize(){
    EmitterProcessor<Event> emitter = ...
    emitter.flatMap(eventForScope ->
        Mono.just(eventForScope)
            .flatmap(this::step1)
            .flatmap(this::step2)
            .flatmap(this::finalStep)
            .subscriberContext(...) //context for ONE event
        )
        .subscribe()
}

Это может быть настроено, некоторые поздние шаги могут больше не нуждаться в контексте каждого события, поэтому вы можете переместить их за пределы внешнего flatMap, et c ...

Это работает, потому что внутри flatMap может видеть «главный контекст», но изменения внутреннего контекста не видны внешней / главной последовательности.

...