Как написать Интеграционные / модульные тесты для KafkaListenerErrorHandler, которые я недавно добавил в свой код, Некоторая помощь в том, что нужно проверить и макетировать / заглушку - PullRequest
0 голосов
/ 16 февраля 2019

рабочая среда : пружинный ботинок 2, data jpa, kafka, spock, gradle. Вопрос: Мне действительно трудно писать тестовые примеры для этого, потому что KafkaListenerErrorHanlder является функциональным интерфейсом и является частью конфигурации Spring.Наряду с этим мне интересно, что исключение выдается, когда база данных не работает, когда приложение работает, так как мне нужно сбросить ошибки БД в контейнер.Выдает ли пружина jpa данных DataAccessException для этого?

@ Открытый класс конфигурации TransactionConsumerConfiguration расширяет KafkaConsumerConfiguration {

private static final Logger LOGGER = LoggerFactory.getLogger(TransactionConsumerConfiguration.class);

@Value("${kafka.consumer.transaction.client.id}")
private String transactionClientId;

@Bean
public KafkaListenerErrorHandler transactionListenerErrorHandler() {
    return (m, e) -> {
        if (e.getCause() instanceof ParseException) {
            LOGGER.error("Error while parsing. Failed Message Payload : {}, \n Exception : {}", m.getPayload(),
                    e.getMostSpecificCause());
        } else if (e.getCause() instanceof IOException) {
            LOGGER.error("Error while transforming. Failed Message Payload : {}, \n Exception : {}",
                    m.getPayload(), e.getMostSpecificCause());
        } else if (e.getCause() instanceof DataAccessException) {
            LOGGER.error("Error while performing database save/update opeartion. Failed Message Payload : {}, \n Exception : {}",
                    m.getPayload(), e.getMostSpecificCause());
        } else {
            throw new TransactionListenerErrorHandlerUnhandledException("Transaction Listener error handler failed to handle the error",
                    e); // custom exception
        }
        return null;
    };
}

public ConsumerFactory<String, AvroTransactionEvent> transactionConsumerFactory(String groupId) {
    Map<String, Object> props = getGenericKafkaConsumerProperties();
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, AvroTransactionEvent> transactionKafkaListenerContainerFactory(
        TransactionFilterStrategy transactionFilterStrategy) {
    ConcurrentKafkaListenerContainerFactory<String, AvroTransactionEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(transactionConsumerFactory(transactionClientId));
    factory.setRecordFilterStrategy(transactionFilterStrategy);
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
    factory.getContainerProperties().setCommitLogLevel(Level.INFO);
    return factory;
}

}

@ Открытый класс службы TransactionListener {

private static final Logger LOGGER = LoggerFactory.getLogger(TransactionListener.class);

@Autowired
TransactionService transactionService;

@Autowired
TransactionObjectMapper transactionObjectMapper;

@KafkaListener(topics = "${kafka.integration.transaction.topic}", containerFactory = "transactionKafkaListenerContainerFactory", errorHandler = "transactionListenerErrorHandler")
public void transactionListener(GenericRecord genericRecord) throws ParseException, IOException {

    List<Transaction> transaction = null;
    LOGGER.info("Received transaction : {}", genericRecord);

    depositoryTransaction = transactionObjectMapper.convertAvroToTransactionDataModelPojo(genericRecord.toString());
    transactionService.create(transaction);
}

}

...