Как передать данные через абонентский контекст реактора? - PullRequest
0 голосов
/ 09 июля 2020

Я новичок в реакторе проекта, но у меня есть задача отправить некоторую информацию из контроллера упора пружины classi c в какую-то службу, которая взаимодействует с другой системой. Целый проект разработан с помощью проекта реактор. Вот мой контроллер отдыха:

@RestController
public class Controller {

@Autowired
Service service;

@PostMapping("/path")
public Mono<String> test(@RequestHeader Map<String, String> headers) throws Exception {
    testService.saveHeader(headers.get("header"));
    return service.getData();
}

А вот мой сервис:

@Service
public class Service {

private Mono<String> monoHeader;
private InteractionService interactor;

public Mono<String> getData() {
    return Mono.fromSupplier(() -> interactor.interact(monoHeader.block()));

}

public void saveHeader(String header) {
    String key = "header";
    monoHeader = Mono.just("")
            .flatMap( s -> Mono.subscriberContext()
                    .map( ctx -> s + ctx.get(key)))
            .subscriberContext(ctx -> ctx.put(key, header));
}

Это приемлемое решение?

Ответы [ 2 ]

1 голос
/ 10 июля 2020

Fisrt off, я не думаю, что вам здесь нужен Context. Полезно неявно передавать данные в Flux или Mono, которые вы не создаете (например, в тот, который драйвер базы данных создает для вас). Но здесь вы отвечаете за создание Mono<String>.

Служба saveHeader действительно чего-то добивается? Вызов кажется временным по своей природе: вы всегда сразу вызываете interactor с последним сохраненным заголовком. (там может быть побочный эффект, когда два параллельных вызова вашей конечной точки в конечном итоге перезаписывают заголовки друг друга).

Если вы действительно хотите сохранить заголовки, вы можете добавить список или карту в свой сервис, но наиболее логичным путем было бы добавить заголовок как параметр getData().

Это исключает поле monoHeader и метод saveHeader.

Затем сам getData: вы не Не нужно когда-либо block() на Mono, если вы хотите вернуть Mono. Добавление входного параметра позволит вам переписать метод как:

public Mono<String> getData(String header) {
    return Mono.fromSupplier(() -> interactor.interact(header));
}

И последнее, но не менее важное: блокировка. interactor кажется внешней службой или библиотекой, которая не является реактивной по своей природе. Если операция связана с некоторой задержкой (что, вероятно, имеет место) или блокируется более чем на несколько миллисекунд, тогда она должна выполняться в отдельном потоке.

Mono.fromSupplier выполняется в любом потоке подписки к нему. В этом случае Spring WebFlux подпишется на него, и он будет работать в потоке событий Netty oop. Если вы заблокируете этот поток, это означает, что никакой другой запрос не может быть обработан во всем приложении!

Итак, вы хотите выполнить interactor в выделенном потоке, что вы можете сделать, используя subscribeOn(Schedulers.boundedElastic()).

Всего:

@RestController
public class Controller {

    @Autowired
    Service service;

    @PostMapping("/path")
    public Mono<String> test(@RequestHeader Map<String, String> headers) throws Exception {
        return service.getData(headers.get("header"));
    }
}

@Service
public class Service {

    private InteractionService interactor;

    public Mono<String> getData(String header) {
        return Mono.fromSupplier(() -> interactor.interact(header))
            .subscribeOn(Schedulers.boundedElastic());
    }
}
0 голосов
/ 10 июля 2020

Как передать данные через контекст подписчика реактора?

Приемлемо ли это решение?

No.

Ваш Код метода saveHeader() является эквивалентом простого

public void saveHeader(String header) {
    monoHeader = Mono.just(header);
}

A subscriberContext необходим, если вы используете значение где-то еще - если моно построено где-то еще. В вашем случае (когда у вас есть весь код перед глазами в том же методе) просто используйте фактическое значение.


Кстати, есть много способов реализовать ваш getData() метод.

Один из них, предложенный Саймоном Басле для избавления от отдельного метода saveHeader().

Другой способ, если вам нужно сохранить поле monoHeader, может быть

public Mono<String> getData() {
    return monoHeader.publishOn(Schedulers.boundedElastic())
            .map(header -> interactor.interact(header));
}
...