Как можно сразу проверить, что сообщение было подтверждено при интеграционном тестировании с использованием Embedded Kafka в Spring Cloud Stream? - PullRequest
1 голос
/ 01 июля 2019

Мы используем Spring Cloud Stream Kafka Binder (с интеграцией Project Reactor, т.е. Flux потоки ) и фиксация вручную смещения (т.е. autoCommitOffset = false).

Мы пытаемся написать интеграционный тест с Embedded Kafka от spring-kafka-test это должно утверждать, что все это работает, вручную считывая смещение группы потребителей, используя admin client , до и после теста отправляет сообщение в нашу тему.

Тесты периодически проходят неудачно. Используя awaitility , теперь мы ждем до 10 секунд, чтобы опросить смещение, и это, похоже, решает большинство наших проблем, поскольку смещение изменится примерно через 7 секунд - но это неудовлетворительно для тестирования.

Есть ли способ убедиться, что Spring Cloud Stream Kafka Binder запишет изменение смещения немедленно, как только мы вручную подтвердим получение сообщения, позвонив по номеру Acknowledgement.acknowledge()?

Другими словами: как мы можем проверить, что acknowledge был вызван в наших тестах без ожидания?

Мы используем Kotlin, Mockito и Mockito-kotlin и поэтому не можем использовать PowerMockito.

1 Ответ

2 голосов
/ 01 июля 2019

Проблема в Consumer не является поточно-ориентированной.Коммиты должны быть выполнены в потоке контейнера.Если потребитель находится в poll(), когда вы подтверждаете, вы должны подождать до pollTimeout, прежде чем будет выполнено смещение.

По умолчанию pollTimeout составляет 5 секунд.

Вы можете добавить ListenerContainerCustomizer @Bean, чтобы изменить ContainerProperties.

...