У меня есть потоковый источник данных (как у Кафки). И я хотел бы применить реактор к этому приложению обработки событий.
В настоящее время я создал бесконечную последовательность событий, используя EmitterProcessor. И он подписывается в первый раз один раз и никогда не отменяется.
Следующий код указывает на то, что я сделал.
public void initialize(){
EmitterProcessor<Event> emitter = ...
emitter.flatmap(this::step1)
.flatmap(this::step2)
.flatmap(this::finalStep)
//.subscriberContext(...)
.subscribe()
}
Для каждого события в начальном Flux<Event>
мне нужно поддерживать / обновлять контекст, чтобы я мог получить все входные данные и результаты для каждого шага и сделать некоторые отчеты на последнем шаге.
Передача неизменяемого класса Context
от шага к шагу - вариант, но это приведет к у всех step()
есть дополнительный параметр. И не все step()
будут использовать Context
. В этом случае кажется уродливым, вы просто передаете Context
и возвращаете Pair<Context,OtherResult>
. Pair
также ужасен.
Поэтому я предпочитаю что-то вроде ThreadLocal<Context>
. Очевидно, что в реакторе замена составляет subscriberContext()
. Однако, согласно моему коду, initialize()
будет вызван один раз. Flux<Event>
будет subscribe()
один раз. subscriberContext
не на моем уровне Event
, а на уровне подписки. Так что в моем коде будет только один контекст. Это не работает.
Вопрос в том, должен ли я считать поток событий Flux<Event>
или несколько Mono<Event>
и делать подписку на каждое событие? Если Mono<Event>
- лучшая практика, тогда я могу напрямую использовать subscriberContext()
. Но есть ли какие-либо временные затраты на ассамблею (ассамблею при каждом наступающем событии)?
В реактор-кафка он делает каждую партию Record
a Flux<Record>
, как он может что-то реализовать как контекст уровня записи?
Спасибо.