Я использую драйвер SpringData MongoDB Reactive Streams с кодом, который выполняет что-то вроде этого:
reactiveMongoOperations.changeStream(changeStreamOptions, MyObject.class)
.parallel()
.runOn(Schedulers.newParallel("my-scheduler", 4))
.map(ChangeStreamEvent::getBody)
.flatMap(o -> {
reactiveMongoOperations.findAndModify(query, update, options, MyObject.class)
})
.subscribe(this::process)
Я ожидал, что все будет выполнено в my-scheduler
.На самом деле происходит то, что операция flatMap
выполняется в my-scheduler
, а код в моем методе process()
- нет.
Может кто-нибудь объяснить, почему это так - это ошибка или нет?Я что то не так делаю?Как я могу получить все операции, определенные в Flux
, для выполнения в одном планировщике?