У меня есть шаблон, который выглядит следующим образом:
@Autowired
private ReplyingKafkaTemplate<ItemId, MessageBDto, MessageBDto> xxx2ReplyingKafkaTemplate;
Мой метод отправки оболочки выглядит следующим образом:
public RequestReplyFuture<ItemId, MessageBDto, MessageBDto> sendAndReceiveMessageB(MessageBDto message) {
ProducerRecord<ItemId, MessageBDto> producerRecord = new ProducerRecord<>(KafkaTopicConfig.xxx2_TOPIC, new ItemId(message.getCount()), message);
producerRecord.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, KafkaTopicConfig.xxx2_REPLY_TOPIC.getBytes()));
return this.xxx2ReplyingKafkaTemplate.sendAndReceive(producerRecord);
}
И это мой слушатель:
@SendTo
@KafkaListener(topics=KafkaTopicConfig.xxx2_TOPIC, containerFactory="xxx2ListenerContainerFactory")
public MessageBDto xxx2Listener(ConsumerRecord<ItemId, MessageBDto> message) {
System.out.println("xxx2(value): " + message.value().getMessage() + ", " + message.value().getCount());
message.value().setCount(message.value().getCount() * 2);
return message.value();
}
Разве это не должно отправлять Key = ItemId, Value = MessageBDto и получать ключ в слушателе?
Кажется, что слушатель не получает ключ и / или кажется, что он другой Экземпляр MessageBDto.
Не понимаю ли я, как это должно работать?
РЕДАКТИРОВАТЬ:
БИНЫ ПРОИЗВОДИТЕЛЯ:
@Bean
public ProducerFactory<ItemId, MessageBDto> xxx2ProducerFactory() {
return new DefaultKafkaProducerFactory<ItemId, MessageBDto>(super.producerConfigs(),
new JsonSerializer<ItemId>(),
new JsonSerializer<MessageBDto>());
}
@Bean
public ConsumerFactory<ItemId, MessageBDto> xxx2ConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(super.consumerConfigs(),
trustingDeserializer(ItemId.class),
trustingDeserializer(MessageBDto.class));
}
@Bean
public KafkaMessageListenerContainer<ItemId, MessageBDto> dtms2MessageListenerContainer() {
return new KafkaMessageListenerContainer<>(xxx2ConsumerFactory(),
new ContainerProperties(KafkaTopicConfig.xxx2_REPLY_TOPIC));
}
@Bean
public ReplyingKafkaTemplate<ItemId, MessageBDto, MessageBDto> xxx2ReplyingKafkaTemplate() {
return new ReplyingKafkaTemplate<>(xxx2ProducerFactory(), xxx2MessageListenerContainer());
}
private <T> JsonDeserializer<T> trustingDeserializer(Class<T> targetType) {
JsonDeserializer<T> deserializer = new JsonDeserializer<>(targetType);
deserializer.addTrustedPackages("*");
return deserializer;
}
ПОТРЕБИТЕЛЬСКИЕ БИНЫ:
@Bean
public KafkaTemplate<ItemId, MessageBDto> xxx2KafkaTemplate() {
return new KafkaTemplate<>(xxx2ProducerFactory());
}
@Bean
public KafkaListenerContainerFactory<ItemId, MessageBDto> xxxListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<ItemId, MessageBDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(dtms2ConsumerFactory());
factory.setReplyTemplate(xxx2KafkaTemplate());
return factory;
}
Когда я смотрю в отладчике моего слушателя, он показывает, что ключ является пустым экземпляром MessageBDto ???
Версии: