Project Reactor Использование EmitterProcessor в качестве шины событий - PullRequest
1 голос
/ 22 апреля 2020

Мы тестировали, как использовать EmitterProcessor в качестве шины событий. По сути, мы хотим косвенно сообщить Spring Controller, что что-то было успешно сделано, мы передаем уникальный ключ, который прослушивается в Spring-контроллере (поэтому, если для другого метода приходит другой ключ, мы его игнорируем).

public class RequestVerifier {

    // constructor
    public RequestVerifier(EmitterProcessor<String> emitterProcessor) {
        this.emitterProcessor = emitterProcessor;
        // put this subscription here for testing purposes
        emitterProcessor.subscribe(value -> {
            System.out.println(value);
        });
    }

    public void runValue(RequestData data) {
        requestService.accept(data);
        FluxSink<String> sink = emitterProcessor.sink();
        sink.next("SOME-KEY");
    }

}
@Configuration
public class AppConfiguration {

    @Bean(name = "flux-event-handler-event-bus-emitter")
    public EmitterProcessor<String> createEventBusEmitter(){
        EmitterProcessor<String> emitter = EmitterProcessor.create();
        return emitter;
    }

}

Проблема в том, что первое сообщение передается на emitterProcessor.subscribe. Однако когда второе сообщение отправляется, когда runValue вызывается во второй раз и далее, мы получаем java.lang.IllegalStateException: Spec. Rule 2.12 - Subscriber.onSubscribe MUST NOT be called more than once (based on object equality). Как мы разрешаем подписке принимать постоянный поток данных?

Также, если это уместно, мы планируем использовать это в обработчике событий аксона и передавать сообщение или ключ от обработчика событий, чтобы уведомить ожидающий контроллер результата для отправки обратно пользователю.

ОБНОВЛЕНИЕ

Также пробовал следующее, хотя не работало

    @Bean(name = "flux-event-handler-event-bus-emitter")
    public EmitterProcessor<String> createEventBusEmitter(){
        EmitterProcessor<String> emitter = EmitterProcessor.create();
        return emitter;
    }

    @Bean(name = "event-autoconnector")
    public Flux<String> returnFlux(EmitterProcessor<String> emitter){
        return emitter.publish().autoConnect();
    }

1 Ответ

3 голосов
/ 22 апреля 2020

На своей игровой площадке Reactive CQRS Reo я играл с оболочкой реактивного API вокруг приложения Axon Framework.

Я думаю, вы можете найти то, что ищете, здесь https://github.com/stefanvozd/Reactive-CQRS, но имейте в виду, что он все еще находится в стадии разработки и в нем отсутствуют комментарии / документация ...

В зависимости от того, что вы пытаетесь сделать с этим потоком, приведено несколько примеров

  1. Пример контроллера покоя, который отправляет команду и ожидает материализации проекции и возвращает это представление пользователю неблокирующим образом. Для этого следует использовать запросы подписки
  2. Пример потока Reactive Event Bus, который вы можете внедрить в виде компонента и присоединить к потоку событий
  3. Пример обработчиков реактивных событий, которые сохраняют проекции с использованием реактивного драйвера БД r2db c, и у вас есть поддержка противодавления, если вы используете Axon Server.

Прямо сейчас, это все экспериментальные примеры, и если вы можете предоставить больше информации о том, чего вы пытаетесь достичь sh Я мог бы дать вам некоторые рекомендации

...