рабочая среда : пружинный ботинок 2, data jpa, kafka, spock, gradle. Вопрос: Мне действительно трудно писать тестовые примеры для этого, потому что KafkaListenerErrorHanlder является функциональным интерфейсом и является частью конфигурации Spring.Наряду с этим мне интересно, что исключение выдается, когда база данных не работает, когда приложение работает, так как мне нужно сбросить ошибки БД в контейнер.Выдает ли пружина jpa данных DataAccessException для этого?
@ Открытый класс конфигурации TransactionConsumerConfiguration расширяет KafkaConsumerConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(TransactionConsumerConfiguration.class);
@Value("${kafka.consumer.transaction.client.id}")
private String transactionClientId;
@Bean
public KafkaListenerErrorHandler transactionListenerErrorHandler() {
return (m, e) -> {
if (e.getCause() instanceof ParseException) {
LOGGER.error("Error while parsing. Failed Message Payload : {}, \n Exception : {}", m.getPayload(),
e.getMostSpecificCause());
} else if (e.getCause() instanceof IOException) {
LOGGER.error("Error while transforming. Failed Message Payload : {}, \n Exception : {}",
m.getPayload(), e.getMostSpecificCause());
} else if (e.getCause() instanceof DataAccessException) {
LOGGER.error("Error while performing database save/update opeartion. Failed Message Payload : {}, \n Exception : {}",
m.getPayload(), e.getMostSpecificCause());
} else {
throw new TransactionListenerErrorHandlerUnhandledException("Transaction Listener error handler failed to handle the error",
e); // custom exception
}
return null;
};
}
public ConsumerFactory<String, AvroTransactionEvent> transactionConsumerFactory(String groupId) {
Map<String, Object> props = getGenericKafkaConsumerProperties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, AvroTransactionEvent> transactionKafkaListenerContainerFactory(
TransactionFilterStrategy transactionFilterStrategy) {
ConcurrentKafkaListenerContainerFactory<String, AvroTransactionEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(transactionConsumerFactory(transactionClientId));
factory.setRecordFilterStrategy(transactionFilterStrategy);
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
factory.getContainerProperties().setCommitLogLevel(Level.INFO);
return factory;
}
}
@ Открытый класс службы TransactionListener {
private static final Logger LOGGER = LoggerFactory.getLogger(TransactionListener.class);
@Autowired
TransactionService transactionService;
@Autowired
TransactionObjectMapper transactionObjectMapper;
@KafkaListener(topics = "${kafka.integration.transaction.topic}", containerFactory = "transactionKafkaListenerContainerFactory", errorHandler = "transactionListenerErrorHandler")
public void transactionListener(GenericRecord genericRecord) throws ParseException, IOException {
List<Transaction> transaction = null;
LOGGER.info("Received transaction : {}", genericRecord);
depositoryTransaction = transactionObjectMapper.convertAvroToTransactionDataModelPojo(genericRecord.toString());
transactionService.create(transaction);
}
}