SpringBoot CloudStream (Кафка) и JPA - PullRequest
       95

SpringBoot CloudStream (Кафка) и JPA

0 голосов
/ 11 декабря 2018

Я создал приложение, которое получает запросы данных через kfka и использует JPA для сохранения данных, например, при каждой ошибке JPA

Exception in thread "user-4db12638-e58b-4728-9374-886ef20d0f31-StreamThread-1" org.springframework.dao.DataIntegrityViolationException: could not execute statement; SQL [n/a]; constraint [UK_sb8bbouer5wak8vyiiy4pf2bx]; nested exception is org.hibernate.exception.ConstraintViolationException: could not execute statement

Существует ли механизм, с помощью которого это можно предотвратить, и ошибки лучше обрабатываются,Я использую KStream Handler в моем StreamListener.

У моих свойств приложения есть

 spring.cloud.stream.bindings.input.destination=input
 spring.cloud.stream.bindings.input.group=user
 spring.cloud.stream.bindings.input.contentType=application/java-serialized-object
 spring.cloud.stream.bindings.input.consumer.header-mode=raw
 spring.cloud.stream.bindings.input.consumer.use-native-decoding=true
 spring.cloud.stream.kafka.bindings.input.consumer.enable-dlq=true
 spring.cloud.stream.kafka.bindings.input.dlqName=errors
 spring.cloud.stream.kafka.bindings.input.autoCommitOnError=true
 spring.cloud.stream.kafka.bindings.input.autoCommitOffset=true

, а у Processor -

@StreamListener(target = "input", copyHeaders = "true")
@SendTo("output")
public KStream<UUID, Event> process(KStream<UUID, Event> stream) {
    return stream
            .map((k, v) -> new KeyValue<UUID, Event>(k, processEntity(k,v)))
            .peek((k, v) -> {
                log.debug("Processing {} {} {}",k, v.getPayload(), v.getException());
            });
}

private Event processEntity(UUID key, Event event) {
        User user = userRepository.save((User) event.getPayload());
        event.setPayload(user);
    return event;
}

Когда я читаю документы весеннего облачного потока, обработка Excetpion;это приведет к тому, что любые исключения во время выполнения, в частности "DataIntegrityViolationException" , будут сериализованы в темы "ошибки" с использованием следующих

spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

, но я никогда не вижу ни одного полученного сообщенияпо;

@KafkaListener(id = "junit", topics = "errors")
public void receieveErrors(Message<?> errorMessage) {
      log.debug("DLQ:  {}" + errorMessage);
}

1 Ответ

0 голосов
/ 11 декабря 2018

Вы можете перехватить исключение в обработчике, используя API процессора, и отправить ошибочную запись в DLQ.См. этот раздел в документации для некоторых примеров.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...