Я использую spring-kafka-2.2.9.RELEASE
и kafka_2.12-2.3.0
. Я пытался получить original payload
в теме .DLT, но все, что я получаю, это "ноль". Я уверен, что это можно сделать с помощью ErrorHandlingDeserializer2
, SeekToCurrentErrorHandler
и DeadLetterPublishingRecoverer
, но я не уверен, чего мне не хватает.
Производитель и потребитель
Producer:
@Autowired
private ObjectMapper objectMapper;
@Bean
public ProducerFactory<Object, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put("acks","all");
return new DefaultKafkaProducerFactory<Object, Object>(props, new JsonSerializer<Object>(objectMapper), new JsonSerializer<>());
//return new DefaultKafkaProducerFactory<Object, Object>(props, new JsonSerializer<Object>(objectMapper), new JsonSerializer<Object>(objectMapper));
}
@Bean
public KafkaTemplate<Object, Object> kafkaTemplate() {
return new KafkaTemplate<Object, Object>(producerFactory());
}
Consumer:
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@Autowired
private ObjectMapper objectMapper;
@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>(objectMapper);
jsonDeserializer.addTrustedPackages("*");
ErrorHandlingDeserializer2<Object> errorHandlingDeserializerKey = new ErrorHandlingDeserializer2<>(jsonDeserializer);
ErrorHandlingDeserializer2<Object> errorHandlingDeserializerValue = new ErrorHandlingDeserializer2<>(jsonDeserializer);
return new DefaultKafkaConsumerFactory<>(props, errorHandlingDeserializerKey, errorHandlingDeserializerValue);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), 3));
return factory;
}
Я вижу "null" в теме. DLT:
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic.DLT --from-beginning
null
null
Заранее благодарен за любую помощь.