Я работаю с простым набором микросервисов, и во время обработки сообщений и сохранения значений происходят две странные вещи.
review_1 | 2020-01-29 16:55:52.867 ERROR 1 --- [.reviewsGroup-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking com.geborskimateusz.microservices.core.review.service.MessageProcessor#process[1 args]; nested exception is com.geborskimateusz.util.exceptions.InvalidInputException: Duplicate key, Movie Id: 333, Review Id:0, failedMessage=GenericMessage [payload=byte[197], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=reviews, amqp_deliveryTag=3, deliveryAttempt=3, amqp_consumerQueue=reviews.reviewsGroup, amqp_redelivered=false, amqp_receivedRoutingKey=reviews, amqp_timestamp=Wed Jan 29 16:55:49 GMT 2020, amqp_messageId=95b4bae5-b5ab-12a1-462a-567252c04778, id=b88df7f0-b6ad-b26b-5280-a80a4396efd7, amqp_consumerTag=amq.ctag-NcAue9uq8t8rQC_EmY9cZQ, contentType=application/json, timestamp=1580316951325}]
review_1 | at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:63)
review_1 | at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
review_1 | at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
review_1 | at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
review_1 | at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
review_1 | at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
review_1 | at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
review_1 | at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:451)
review_1 | at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:400)
review_1 | at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
review_1 | at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
review_1 | at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
review_1 | at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
review_1 | at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)
review_1 | at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:58)
review_1 | at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:211)
review_1 | at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
review_1 | at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180)
review_1 | at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:208)
review_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1477)
review_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1400)
review_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1387)
review_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1366)
review_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:848)
review_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:832)
review_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:78)
review_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1073)
review_1 | at java.base/java.lang.Thread.run(Thread.java:835)
review_1 | Caused by: com.geborskimateusz.util.exceptions.InvalidInputException: Duplicate key, Movie Id: 333, Review Id:0
review_1 | at com.geborskimateusz.microservices.core.review.service.BaseReviewService.createReview(BaseReviewService.java:58)
review_1 | at com.geborskimateusz.microservices.core.review.service.MessageProcessor.process(MessageProcessor.java:34)
review_1 | at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
review_1 | at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
review_1 | at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
review_1 | at java.base/java.lang.reflect.Method.invoke(Method.java:567)
review_1 | at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:189)
review_1 | at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119)
review_1 | at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
review_1 | ... 27 more
Во-первых, когда я подготавливаю Object для отправки thoutgth сообщения, reviewId равно некоторому значению, например 437, где, когда сообщение получено в обработчике сообщений, это, и только это значение каким-то образом теряется, и оно просто 0, я буду представлять его с помощью кода и журналов:
Метод, который отправит сообщение, выглядит так:
@Override
public Review createReview(Review review) {
log.info("MovieCompositeIntegration.createReview(Review review), passed argument: {}", review.toString());
messageSources.outputReviews()
.send(MessageBuilder.withPayload(
new Event<>(Event.Type.CREATE, review.getMovieId(), review)
).build());
return review;
}
Вы можете увидеть review.toString, которое в журналах имеет значение reviewId: Журнал из составного сервиса:
movie-composite_1 | 2020-01-29 16:01:32.327 INFO 1 --- [or-http-epoll-2] c.g.m.c.m.s.MovieCompositeIntegration : MovieCompositeIntegration.createReview(Review review), passed argument: Review{movieId=333, reviewId=451, author='author 1', subject='subject 1', content='content 1', serviceAddress='null'}
Этот метод отправляет сообщение на просмотр микросервиса, где событие CREATE выглядит следующим образом:
@Slf4j
@EnableBinding(Sink.class)
public class MessageProcessor {
private final ReviewService reviewService;
public MessageProcessor(ReviewService reviewService) {
this.reviewService = reviewService;
}
@StreamListener(target = Sink.INPUT)
public void process(Event<Integer, Review> event) {
log.info("Process message created at {}...", event.getEventCreatedAt());
log.info("Process message body: {}", event.toString());
switch (event.getEventType()) {
case CREATE:
Review review = event.getData();
log.info("Create review with ID: {}/{}", review.getMovieId(), review.getReviewId());
log.info(review.toString());
reviewService.createReview(review);
break;
default:
String errorMessage = "Incorrect event type: " + event.getEventType() + ", expected a CREATE or DELETE event";
log.warn(errorMessage);
throw new EventProcessingException(errorMessage);
}
log.info("Message processing done!");
}
}
, и здесь отсутствует это и только это значение reviewId, журналы похожи :
review_1 | 2020-01-29 16:01:32.433 INFO 1 --- [.reviewsGroup-1] c.g.m.c.review.service.MessageProcessor : Process message created at 2020-01-29T16:01:32.319482...
review_1 | 2020-01-29 16:01:32.433 INFO 1 --- [.reviewsGroup-1] c.g.m.c.review.service.MessageProcessor : Process message body: Event{eventType=CREATE, key=333, data=Review{movieId=333, reviewId=0, author='author 1', subject='null', content='content 1', serviceAddress='null'}, eventCreatedAt=2020-01-29T16:01:32.319482}
review_1 | 2020-01-29 16:01:32.434 INFO 1 --- [.reviewsGroup-1] c.g.m.c.review.service.MessageProcessor : Create review with ID: 333/0
review_1 | 2020-01-29 16:01:32.434 INFO 1 --- [.reviewsGroup-1] c.g.m.c.review.service.MessageProcessor : Review{movieId=333, reviewId=0, author='author 1', subject='null', content='content 1', serviceAddress='null'}
Из-за этого (вероятно) при сохранении в базу данных выдается эта ошибка:
review_1 | 2020-01-29 16:01:34.151 ERROR 1 --- [.reviewsGroup-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking com.geborskimateusz.microservices.core.review.service.MessageProcessor#process[1 args]; nested exception is com.geborskimateusz.util.exceptions.InvalidInputException: Duplicate key, Movie Id: 333, Review Id:0, failedMessage=GenericMessage [payload=byte[197], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=reviews, amqp_deliveryTag=2, deliveryAttempt=3, amqp_consumerQueue=reviews.reviewsGroup, amqp_redelivered=false, amqp_receivedRoutingKey=reviews, amqp_timestamp=Wed Jan 29 16:01:32 GMT 2020, amqp_messageId=b9f05c16-67a5-a823-de74-5f519f14d8b3, id=4ddedcea-3938-0e03-0ac8-536e5149f50b, amqp_consumerTag=amq.ctag-GaxrablC0mlEtccmO7yVLw, contentType=application/json, timestamp=1580313692567}]
Более того, даже если reviewId каким-то образом изменен с конкретного значения на 0 по умолчанию поведение должно распознавать эту сущность как новую, но возникает исключение. Что мне здесь не хватает? Я борюсь с этим около недели, у меня нет идей, любая помощь будет оценена. Я прилагаю также дополнительный код события и обзора:
Обзор:
@Getter
@Setter
@AllArgsConstructor
@Builder
public class Review {
private final int movieId;
private final int reviewId;
private final String author;
private final String subject;
private final String content;
private String serviceAddress;
public Review() {
movieId = 0;
reviewId = 0;
author = null;
subject = null;
content = null;
serviceAddress = null;
}
@Override
public String toString() {
return "Review{" +
"movieId=" + movieId +
", reviewId=" + reviewId +
", author='" + author + '\'' +
", subject='" + subject + '\'' +
", content='" + content + '\'' +
", serviceAddress='" + serviceAddress + '\'' +
'}';
}
}
Просмотр объекта:
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Getter
@Setter
@Entity
@Table(name = "reviews", indexes = { @Index(name = "reviews_unique_idx", unique = true, columnList = "movieId,reviewId") })
public class ReviewEntity {
@Id
@GeneratedValue
private Integer id;
@Version
private Integer version;
private Integer movieId;
private Integer reviewId;
private String author;
private String subject;
private String content;
private String serviceAddress;
public ReviewEntity(Integer movieId, Integer reviewId, String author, String subject, String content, String serviceAddress) {
this.movieId = movieId;
this.reviewId = reviewId;
this.author = author;
this.subject = subject;
this.content = content;
this.serviceAddress = serviceAddress;
}
}
и Событие:
@Getter
public class Event<K, T> {
public enum Type {CREATE, DELETE}
private Event.Type eventType;
private K key;
private T data;
private LocalDateTime eventCreatedAt;
public Event() {
this.eventType = null;
this.key = null;
this.data = null;
this.eventCreatedAt = null;
}
public Event(Type eventType, K key, T data) {
this.eventType = eventType;
this.key = key;
this.data = data;
this.eventCreatedAt = now();
}
@Override
public String toString() {
return "Event{" +
"eventType=" + eventType +
", key=" + key +
", data=" + data +
", eventCreatedAt=" + eventCreatedAt +
'}';
}
}