MessageProcessor теряет значения при обработке -> InvalidInputException: дубликат ключа, Mov ie Id: 333, Review Id: 0 - PullRequest
1 голос
/ 29 января 2020

Я работаю с простым набором микросервисов, и во время обработки сообщений и сохранения значений происходят две странные вещи.

review_1           | 2020-01-29 16:55:52.867 ERROR 1 --- [.reviewsGroup-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking com.geborskimateusz.microservices.core.review.service.MessageProcessor#process[1 args]; nested exception is com.geborskimateusz.util.exceptions.InvalidInputException: Duplicate key, Movie Id: 333, Review Id:0, failedMessage=GenericMessage [payload=byte[197], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=reviews, amqp_deliveryTag=3, deliveryAttempt=3, amqp_consumerQueue=reviews.reviewsGroup, amqp_redelivered=false, amqp_receivedRoutingKey=reviews, amqp_timestamp=Wed Jan 29 16:55:49 GMT 2020, amqp_messageId=95b4bae5-b5ab-12a1-462a-567252c04778, id=b88df7f0-b6ad-b26b-5280-a80a4396efd7, amqp_consumerTag=amq.ctag-NcAue9uq8t8rQC_EmY9cZQ, contentType=application/json, timestamp=1580316951325}]
review_1           |    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:63)
review_1           |    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
review_1           |    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
review_1           |    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
review_1           |    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
review_1           |    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
review_1           |    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
review_1           |    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:451)
review_1           |    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:400)
review_1           |    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
review_1           |    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
review_1           |    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
review_1           |    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
review_1           |    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)
review_1           |    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:58)
review_1           |    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:211)
review_1           |    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
review_1           |    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
review_1           |    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:208)
review_1           |    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1477)
review_1           |    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1400)
review_1           |    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1387)
review_1           |    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1366)
review_1           |    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:848)
review_1           |    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:832)
review_1           |    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:78)
review_1           |    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1073)
review_1           |    at java.base/java.lang.Thread.run(Thread.java:835)
review_1           | Caused by: com.geborskimateusz.util.exceptions.InvalidInputException: Duplicate key, Movie Id: 333, Review Id:0
review_1           |    at com.geborskimateusz.microservices.core.review.service.BaseReviewService.createReview(BaseReviewService.java:58)
review_1           |    at com.geborskimateusz.microservices.core.review.service.MessageProcessor.process(MessageProcessor.java:34)
review_1           |    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
review_1           |    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
review_1           |    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
review_1           |    at java.base/java.lang.reflect.Method.invoke(Method.java:567)
review_1           |    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:189)
review_1           |    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119)
review_1           |    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
review_1           |    ... 27 more

Во-первых, когда я подготавливаю Object для отправки thoutgth сообщения, reviewId равно некоторому значению, например 437, где, когда сообщение получено в обработчике сообщений, это, и только это значение каким-то образом теряется, и оно просто 0, я буду представлять его с помощью кода и журналов:

Метод, который отправит сообщение, выглядит так:

 @Override
    public Review createReview(Review review) {
        log.info("MovieCompositeIntegration.createReview(Review review), passed argument: {}", review.toString());

        messageSources.outputReviews()
                .send(MessageBuilder.withPayload(
                        new Event<>(Event.Type.CREATE, review.getMovieId(), review)
                ).build());

        return review;
    } 

Вы можете увидеть review.toString, которое в журналах имеет значение reviewId: Журнал из составного сервиса:

movie-composite_1  | 2020-01-29 16:01:32.327  INFO 1 --- [or-http-epoll-2] c.g.m.c.m.s.MovieCompositeIntegration    : MovieCompositeIntegration.createReview(Review review), passed argument: Review{movieId=333, reviewId=451, author='author 1', subject='subject 1', content='content 1', serviceAddress='null'}

Этот метод отправляет сообщение на просмотр микросервиса, где событие CREATE выглядит следующим образом:

@Slf4j
@EnableBinding(Sink.class)
public class MessageProcessor {

    private final ReviewService reviewService;

    public MessageProcessor(ReviewService reviewService) {
        this.reviewService = reviewService;
    }

    @StreamListener(target = Sink.INPUT)
    public void process(Event<Integer, Review> event) {

        log.info("Process message created at {}...", event.getEventCreatedAt());
        log.info("Process message body: {}", event.toString());

        switch (event.getEventType()) {

            case CREATE:
                Review review = event.getData();
                log.info("Create review with ID: {}/{}", review.getMovieId(), review.getReviewId());
                log.info(review.toString());
                reviewService.createReview(review);
                break;

            default:
                String errorMessage = "Incorrect event type: " + event.getEventType() + ", expected a CREATE or DELETE event";
                log.warn(errorMessage);
                throw new EventProcessingException(errorMessage);
        }

        log.info("Message processing done!");
    }
}

, и здесь отсутствует это и только это значение reviewId, журналы похожи :

    review_1           | 2020-01-29 16:01:32.433  INFO 1 --- [.reviewsGroup-1] c.g.m.c.review.service.MessageProcessor  : Process message created at 2020-01-29T16:01:32.319482...
    review_1           | 2020-01-29 16:01:32.433  INFO 1 --- [.reviewsGroup-1] c.g.m.c.review.service.MessageProcessor  : Process message body: Event{eventType=CREATE, key=333, data=Review{movieId=333, reviewId=0, author='author 1', subject='null', content='content 1', serviceAddress='null'}, eventCreatedAt=2020-01-29T16:01:32.319482}
    review_1           | 2020-01-29 16:01:32.434  INFO 1 --- [.reviewsGroup-1] c.g.m.c.review.service.MessageProcessor  : Create review with ID: 333/0
    review_1           | 2020-01-29 16:01:32.434  INFO 1 --- [.reviewsGroup-1] c.g.m.c.review.service.MessageProcessor  : Review{movieId=333, reviewId=0, author='author 1', subject='null', content='content 1', serviceAddress='null'}

Из-за этого (вероятно) при сохранении в базу данных выдается эта ошибка:

review_1           | 2020-01-29 16:01:34.151 ERROR 1 --- [.reviewsGroup-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking com.geborskimateusz.microservices.core.review.service.MessageProcessor#process[1 args]; nested exception is com.geborskimateusz.util.exceptions.InvalidInputException: Duplicate key, Movie Id: 333, Review Id:0, failedMessage=GenericMessage [payload=byte[197], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=reviews, amqp_deliveryTag=2, deliveryAttempt=3, amqp_consumerQueue=reviews.reviewsGroup, amqp_redelivered=false, amqp_receivedRoutingKey=reviews, amqp_timestamp=Wed Jan 29 16:01:32 GMT 2020, amqp_messageId=b9f05c16-67a5-a823-de74-5f519f14d8b3, id=4ddedcea-3938-0e03-0ac8-536e5149f50b, amqp_consumerTag=amq.ctag-GaxrablC0mlEtccmO7yVLw, contentType=application/json, timestamp=1580313692567}]

Более того, даже если reviewId каким-то образом изменен с конкретного значения на 0 по умолчанию поведение должно распознавать эту сущность как новую, но возникает исключение. Что мне здесь не хватает? Я борюсь с этим около недели, у меня нет идей, любая помощь будет оценена. Я прилагаю также дополнительный код события и обзора:

Обзор:

@Getter
@Setter
@AllArgsConstructor
@Builder
public class Review {
    private final int movieId;
    private final int reviewId;
    private final String author;
    private final String subject;
    private final String content;
    private String serviceAddress;

    public Review() {
        movieId = 0;
        reviewId = 0;
        author = null;
        subject = null;
        content = null;
        serviceAddress = null;
    }

    @Override
    public String toString() {
        return "Review{" +
                "movieId=" + movieId +
                ", reviewId=" + reviewId +
                ", author='" + author + '\'' +
                ", subject='" + subject + '\'' +
                ", content='" + content + '\'' +
                ", serviceAddress='" + serviceAddress + '\'' +
                '}';
    }
} 

Просмотр объекта:

@NoArgsConstructor
@AllArgsConstructor
@Builder
@Getter
@Setter
@Entity
@Table(name = "reviews", indexes = { @Index(name = "reviews_unique_idx", unique = true, columnList = "movieId,reviewId") })
public class ReviewEntity {

    @Id
    @GeneratedValue
    private Integer id;

    @Version
    private Integer version;

    private Integer movieId;
    private Integer reviewId;
    private String author;
    private String subject;
    private String content;
    private String serviceAddress;

    public ReviewEntity(Integer movieId, Integer reviewId, String author, String subject, String content, String serviceAddress) {
        this.movieId = movieId;
        this.reviewId = reviewId;
        this.author = author;
        this.subject = subject;
        this.content = content;
        this.serviceAddress = serviceAddress;
    }
}

и Событие:

@Getter
public class Event<K, T> {

    public enum Type {CREATE, DELETE}

    private Event.Type eventType;
    private K key;
    private T data;
    private LocalDateTime eventCreatedAt;

    public Event() {
        this.eventType = null;
        this.key = null;
        this.data = null;
        this.eventCreatedAt = null;
    }

    public Event(Type eventType, K key, T data) {
        this.eventType = eventType;
        this.key = key;
        this.data = data;
        this.eventCreatedAt = now();
    }

    @Override
    public String toString() {
        return "Event{" +
                "eventType=" + eventType +
                ", key=" + key +
                ", data=" + data +
                ", eventCreatedAt=" + eventCreatedAt +
                '}';
    }
}
...