Сообщение Spring Cloud Stream не получено через очередь сообщений - PullRequest
0 голосов
/ 20 марта 2019

У меня есть 2 микросервиса (изображения и комментарии), которые взаимодействуют друг с другом (обнаружены службой обнаружения eureka), используя Spring Cloud Stream с брокером rabbit mq.Я хочу отправить сообщение из микросервиса изображений, чтобы прокомментировать микросервис.Проблема в том, что метод CommentController StreamListener 'save' не вызывается.

Изображение Microservice-> CommentController.java:

public CommentController(MeterRegistry meterRegistry) {
    this.meterRegistry = meterRegistry;
    this.flux = Flux.<Message<Comment>>create(
            emitter -> this.commentSink = emitter,
            FluxSink.OverflowStrategy.IGNORE)
            .publish()
            .autoConnect();
}
@PostMapping("/comments")
public Mono<String> addComment(Mono<Comment> newComment) {
    if (commentSink != null) {
        return newComment
            .map(comment -> {
                commentSink.next(MessageBuilder
                    .withPayload(comment)
                    .setHeader(MessageHeaders.CONTENT_TYPE,
                        MediaType.APPLICATION_JSON_VALUE)
                    .build());
                return comment;
            })
            .flatMap(comment -> {
                meterRegistry
                    .counter("comments.produced", "imageId", comment.getImageId())
                    .increment();
                return Mono.just("redirect:/");
            });
    } else {
        return Mono.just("redirect:/");
    }
}
    @StreamEmitter
@Output(Source.OUTPUT)
public void emit(FluxSender output) {
    output.send(this.flux);
}

Comment Microservice -> CommentService.java

@StreamListener
@Output(Processor.OUTPUT)
public Flux<Void> save(@Input(Processor.INPUT) Flux<Comment> newComment) {
    return repository
        .saveAll(newComment)
        .flatMap(comment -> {
            meterRegistry
                .counter("comments.consumed", "imageId", comment.getImageId())
                .increment();
            return Mono.empty();
        });
}

CommentService.java -> @Service @EnableBinding(Processor.class) public class CommentService { ....

Репозиторий, который я клонировал Глава 7 / часть 1

Микросервис изображения -> CommentController.java

Комментарий Микросервис -> CommentService.java

...