Я использовал spring-kafka для своего проекта и в настоящее время сталкиваюсь с проблемой.
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.polubentcev.messenger.model.subscription.SubscriptionsRequest] for GenericMessage [payload={"userId":"4","pagination":{"offset":0,"quantity":10}}, headers={kafka_offset=23, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3817b042, kafka_correlationId=[B@13cf505b, kafka_timestampType=CREATE_TIME, kafka_replyTopic=[B@58355ff, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=07, kafka_receivedTimestamp=1548552030765, __TypeId__=[B@7480d2ef}]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144) ~[spring-messaging-5.1.3.RELEASE.jar:5.1.3.RELEASE]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.1.3.RELEASE.jar:5.1.3.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-5.1.3.RELEASE.jar:5.1.3.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.1.3.RELEASE.jar:5.1.3.RELEASE]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
... 14 common frames omitted
Такое исключение выдается, когда на мой микросервис приходит сообщение.
Вот моя конфигурация:
private final Map<String, Object> consumerProps;
public KafkaConsumerConfiguration(@Value("${kafka.bootstrap-servers}") final String bootstrapServers,
@Value("${kafka.consumer.group-id}") final String consumerGroupId) {
this.consumerProps = Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class
);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Pair<String, String>>>
kafkaSubscriptionListenerContainerFactory(final ReplyingKafkaTemplate<String, Pair<String, String>, UsersSubscription> kafkaTemplate) {
final DefaultKafkaConsumerFactory<String, Pair<String, String>> cf = new DefaultKafkaConsumerFactory<>(
consumerProps, new StringDeserializer(), new JsonDeserializer<>()
);
final ConcurrentKafkaListenerContainerFactory<String, Pair<String, String>> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf);
factory.setReplyTemplate(kafkaTemplate);
return factory;
}
Это очень странно, потому что это должно работать.Я попытался решить это самостоятельно, но безуспешно, потому что не знал, где копать глубже.
Не могли бы вы помочь мне с этим?
Кстати, я пытался конвертировать в отладчикеполезная нагрузка String
, использующая новый ObjectMapper<>().readValue()
, и он работает, также все SubscriptionsRequest
содержит @JsonCreator
и все необходимые вещи для десериализации.
ВОЗМОЖНОЕ РЕШЕНИЕ: Я добавил конструктор SubscriptionsRequest(String str)
, где я создаю экземпляриспользуя ObjectMapper
.Это работает, но уродливо.