Мы используем Spring Cloud Stream Kafka Binder (с интеграцией Project Reactor, т.е.
Flux
потоки )
и фиксация вручную смещения
(т.е. autoCommitOffset = false
).
Мы пытаемся написать интеграционный тест с Embedded Kafka от spring-kafka-test
это должно утверждать, что все это работает, вручную считывая смещение группы потребителей, используя admin client ,
до и после теста отправляет сообщение в нашу тему.
Тесты периодически проходят неудачно. Используя awaitility , теперь мы ждем до 10 секунд, чтобы опросить смещение,
и это, похоже, решает большинство наших проблем, поскольку смещение изменится примерно через 7 секунд - но это неудовлетворительно для тестирования.
Есть ли способ убедиться, что Spring Cloud Stream Kafka Binder запишет изменение смещения немедленно, как только мы вручную подтвердим получение сообщения, позвонив по номеру Acknowledgement.acknowledge()
?
Другими словами: как мы можем проверить, что acknowledge
был вызван в наших тестах без ожидания?
Мы используем Kotlin, Mockito и Mockito-kotlin и поэтому не можем использовать PowerMockito.