«разделение» частей реактивного потока через несколько вызовов покоя - PullRequest
1 голос
/ 21 мая 2019

У меня есть Spring WebFlux контроллер:

@RestController
public class Controller
{
    @PostMapping("/doStuff")
    public Mono<Response> doStuff(@RequestBody Mono<Request> request)
    {
        ...
    }
}

Теперь, скажем, я хотел связать отдельные запросы , поступающие на этот контроллер от разных клиентов, к групповой обработке на основе некоторого свойства объекта Request.

дубль 1:

@PostMapping("/doStuff")
public Mono<Response> doStuff(@RequestBody Mono<Request> request)
{
    return request.flux()
                  .groupBy(r -> r.someProperty())
                  .flatMap(gf -> gf.map(r -> doStuff(r)));
}

Это не будет работать, потому что каждый вызов получит свой экземпляр потока. Весь вызов flux() не имеет смысла, всегда будет один объект Request, проходящий через поток, даже если многие из этих потоков запускаются одновременно в результате одновременных вызовов от клиентов. Я понимаю, что мне нужна некоторая часть потока, которая распределяется между всеми запросами, где я мог бы выполнить свою группировку, что привело меня к этому слегка сконструированному коду

Взять 2:

private AtomicReference<FluxSink<Request>> sink = new AtomicReference<>();
private Flux<Response> serializingStream;


public Controller()
{
    this.serializingStream =
        Flux.<Request>create(fluxSink -> sink.set(fluxSink), ERROR)
            .groupBy(r -> r.someProperty())
            .flatMap(gf -> gf.map(r -> doStuff(r)));
            .publish()
            .autoConnect();

    this.serializingStream.subscribe().dispose(); //dummy subscription to set the sink;
}

@PostMapping("/doStuff")
public Mono<Response> doStuff(@RequestBody Request request)
{
    req.setReqId(UUID.randomUUID().toString());
    return
        serializingStream
            .doOnSubscribe(__ -> sink.get().next(req))
            .filter(resp -> resp.getReqId().equals(req.getReqId()))
            .take(1)
            .single();
}

И такого рода работы, хотя, похоже, я делаю то, что не должен (или, по крайней мере, они не чувствуют себя правильно), например, утечка FluxSink, а затем введение значения через него при подписке, добавление идентификатор запроса, чтобы я мог отфильтровать правильный ответ. Кроме того, если ошибка возникает в serializingStream, то это ломает все для всех, но я думаю, я мог бы попытаться изолировать ошибки, чтобы все продолжалось.

Вопрос в том, есть ли лучший способ сделать это, который не похож на операцию на открытом сердце.

Также, связанный вопрос для похожего сценария. Я думал об использовании Akka Persistence для реализации источника событий и запуска его изнутри этого потока Reactor. Я читал о Akka Streams, которые позволяют обернуть актера, а затем есть несколько способов преобразовать это во что-то, что может быть связано с Reactor (он же Publisher или Subscriber), но затем, если каждый запрос получает В своем потоке я эффективно теряю противодействие и рискую OOME из-за переполнения почтового ящика персистентного актера, поэтому я полагаю, что проблема относится к той же категории, что и описанная выше.

...