Я создал приложение, которое получает запросы данных через kfka и использует JPA для сохранения данных, например, при каждой ошибке JPA
Exception in thread "user-4db12638-e58b-4728-9374-886ef20d0f31-StreamThread-1" org.springframework.dao.DataIntegrityViolationException: could not execute statement; SQL [n/a]; constraint [UK_sb8bbouer5wak8vyiiy4pf2bx]; nested exception is org.hibernate.exception.ConstraintViolationException: could not execute statement
Существует ли механизм, с помощью которого это можно предотвратить, и ошибки лучше обрабатываются,Я использую KStream Handler в моем StreamListener.
У моих свойств приложения есть
spring.cloud.stream.bindings.input.destination=input
spring.cloud.stream.bindings.input.group=user
spring.cloud.stream.bindings.input.contentType=application/java-serialized-object
spring.cloud.stream.bindings.input.consumer.header-mode=raw
spring.cloud.stream.bindings.input.consumer.use-native-decoding=true
spring.cloud.stream.kafka.bindings.input.consumer.enable-dlq=true
spring.cloud.stream.kafka.bindings.input.dlqName=errors
spring.cloud.stream.kafka.bindings.input.autoCommitOnError=true
spring.cloud.stream.kafka.bindings.input.autoCommitOffset=true
, а у Processor -
@StreamListener(target = "input", copyHeaders = "true")
@SendTo("output")
public KStream<UUID, Event> process(KStream<UUID, Event> stream) {
return stream
.map((k, v) -> new KeyValue<UUID, Event>(k, processEntity(k,v)))
.peek((k, v) -> {
log.debug("Processing {} {} {}",k, v.getPayload(), v.getException());
});
}
private Event processEntity(UUID key, Event event) {
User user = userRepository.save((User) event.getPayload());
event.setPayload(user);
return event;
}
Когда я читаю документы весеннего облачного потока, обработка Excetpion;это приведет к тому, что любые исключения во время выполнения, в частности "DataIntegrityViolationException" , будут сериализованы в темы "ошибки" с использованием следующих
spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
, но я никогда не вижу ни одного полученного сообщенияпо;
@KafkaListener(id = "junit", topics = "errors")
public void receieveErrors(Message<?> errorMessage) {
log.debug("DLQ: {}" + errorMessage);
}