Мы тестировали, как использовать EmitterProcessor в качестве шины событий. По сути, мы хотим косвенно сообщить Spring Controller, что что-то было успешно сделано, мы передаем уникальный ключ, который прослушивается в Spring-контроллере (поэтому, если для другого метода приходит другой ключ, мы его игнорируем).
public class RequestVerifier {
// constructor
public RequestVerifier(EmitterProcessor<String> emitterProcessor) {
this.emitterProcessor = emitterProcessor;
// put this subscription here for testing purposes
emitterProcessor.subscribe(value -> {
System.out.println(value);
});
}
public void runValue(RequestData data) {
requestService.accept(data);
FluxSink<String> sink = emitterProcessor.sink();
sink.next("SOME-KEY");
}
}
@Configuration
public class AppConfiguration {
@Bean(name = "flux-event-handler-event-bus-emitter")
public EmitterProcessor<String> createEventBusEmitter(){
EmitterProcessor<String> emitter = EmitterProcessor.create();
return emitter;
}
}
Проблема в том, что первое сообщение передается на emitterProcessor.subscribe
. Однако когда второе сообщение отправляется, когда runValue
вызывается во второй раз и далее, мы получаем java.lang.IllegalStateException: Spec. Rule 2.12 - Subscriber.onSubscribe MUST NOT be called more than once (based on object equality)
. Как мы разрешаем подписке принимать постоянный поток данных?
Также, если это уместно, мы планируем использовать это в обработчике событий аксона и передавать сообщение или ключ от обработчика событий, чтобы уведомить ожидающий контроллер результата для отправки обратно пользователю.
ОБНОВЛЕНИЕ
Также пробовал следующее, хотя не работало
@Bean(name = "flux-event-handler-event-bus-emitter")
public EmitterProcessor<String> createEventBusEmitter(){
EmitterProcessor<String> emitter = EmitterProcessor.create();
return emitter;
}
@Bean(name = "event-autoconnector")
public Flux<String> returnFlux(EmitterProcessor<String> emitter){
return emitter.publish().autoConnect();
}