У меня есть 2 микросервиса (изображения и комментарии), которые взаимодействуют друг с другом (обнаружены службой обнаружения eureka), используя Spring Cloud Stream с брокером rabbit mq.Я хочу отправить сообщение из микросервиса изображений, чтобы прокомментировать микросервис.Проблема в том, что метод CommentController StreamListener 'save' не вызывается.
Изображение Microservice-> CommentController.java:
public CommentController(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.flux = Flux.<Message<Comment>>create(
emitter -> this.commentSink = emitter,
FluxSink.OverflowStrategy.IGNORE)
.publish()
.autoConnect();
}
@PostMapping("/comments")
public Mono<String> addComment(Mono<Comment> newComment) {
if (commentSink != null) {
return newComment
.map(comment -> {
commentSink.next(MessageBuilder
.withPayload(comment)
.setHeader(MessageHeaders.CONTENT_TYPE,
MediaType.APPLICATION_JSON_VALUE)
.build());
return comment;
})
.flatMap(comment -> {
meterRegistry
.counter("comments.produced", "imageId", comment.getImageId())
.increment();
return Mono.just("redirect:/");
});
} else {
return Mono.just("redirect:/");
}
}
@StreamEmitter
@Output(Source.OUTPUT)
public void emit(FluxSender output) {
output.send(this.flux);
}
Comment Microservice -> CommentService.java
@StreamListener
@Output(Processor.OUTPUT)
public Flux<Void> save(@Input(Processor.INPUT) Flux<Comment> newComment) {
return repository
.saveAll(newComment)
.flatMap(comment -> {
meterRegistry
.counter("comments.consumed", "imageId", comment.getImageId())
.increment();
return Mono.empty();
});
}
CommentService.java -> @Service
@EnableBinding(Processor.class)
public class CommentService { ....
Репозиторий, который я клонировал Глава 7 / часть 1
Микросервис изображения -> CommentController.java
Комментарий Микросервис -> CommentService.java