У меня есть Spring WebFlux контроллер:
@RestController
public class Controller
{
@PostMapping("/doStuff")
public Mono<Response> doStuff(@RequestBody Mono<Request> request)
{
...
}
}
Теперь, скажем, я хотел связать отдельные запросы , поступающие на этот контроллер от разных клиентов, к групповой обработке на основе некоторого свойства объекта Request
.
дубль 1:
@PostMapping("/doStuff")
public Mono<Response> doStuff(@RequestBody Mono<Request> request)
{
return request.flux()
.groupBy(r -> r.someProperty())
.flatMap(gf -> gf.map(r -> doStuff(r)));
}
Это не будет работать, потому что каждый вызов получит свой экземпляр потока. Весь вызов flux()
не имеет смысла, всегда будет один объект Request
, проходящий через поток, даже если многие из этих потоков запускаются одновременно в результате одновременных вызовов от клиентов. Я понимаю, что мне нужна некоторая часть потока, которая распределяется между всеми запросами, где я мог бы выполнить свою группировку, что привело меня к этому слегка сконструированному коду
Взять 2:
private AtomicReference<FluxSink<Request>> sink = new AtomicReference<>();
private Flux<Response> serializingStream;
public Controller()
{
this.serializingStream =
Flux.<Request>create(fluxSink -> sink.set(fluxSink), ERROR)
.groupBy(r -> r.someProperty())
.flatMap(gf -> gf.map(r -> doStuff(r)));
.publish()
.autoConnect();
this.serializingStream.subscribe().dispose(); //dummy subscription to set the sink;
}
@PostMapping("/doStuff")
public Mono<Response> doStuff(@RequestBody Request request)
{
req.setReqId(UUID.randomUUID().toString());
return
serializingStream
.doOnSubscribe(__ -> sink.get().next(req))
.filter(resp -> resp.getReqId().equals(req.getReqId()))
.take(1)
.single();
}
И такого рода работы, хотя, похоже, я делаю то, что не должен (или, по крайней мере, они не чувствуют себя правильно), например, утечка FluxSink
, а затем введение значения через него при подписке, добавление идентификатор запроса, чтобы я мог отфильтровать правильный ответ. Кроме того, если ошибка возникает в serializingStream
, то это ломает все для всех, но я думаю, я мог бы попытаться изолировать ошибки, чтобы все продолжалось.
Вопрос в том, есть ли лучший способ сделать это, который не похож на операцию на открытом сердце.
Также, связанный вопрос для похожего сценария. Я думал об использовании Akka Persistence
для реализации источника событий и запуска его изнутри этого потока Reactor
. Я читал о Akka Streams
, которые позволяют обернуть актера, а затем есть несколько способов преобразовать это во что-то, что может быть связано с Reactor
(он же Publisher
или Subscriber
), но затем, если каждый запрос получает В своем потоке я эффективно теряю противодействие и рискую OOME из-за переполнения почтового ящика персистентного актера, поэтому я полагаю, что проблема относится к той же категории, что и описанная выше.