Я использую встроенный брокер Kafka с пружинной загрузкой и junit 5. Я успешно подключился и вижу, что встроенный брокер работает.
В моем методе настройки я добавляю несколько сообщений в очередь, которую прослушивает мой фактический код
@BeforeAll
public void setup() {
// code to play down some messages to topic X
}
Мой потребитель / слушатель никогда не запускается, несмотря на то, что в методе установки не было ошибок
Мой потребитель настроен как
class Consumer() {
@KafkaListener(topics="X",
groupId ="...",
containerFactory="my-container-factory"
)
public void consume(ConsumerRecord<String,byte[] rec) {
//logic to handle
logger.info("Print rec : "+rec)
}
}
еще, где я настроил свой ListenerContainerFactory с именем, подобным
@Bean(name="my-container-factory")
public KafkaContainerListenerFactory<String,byte[]> factory() {
}
Что может быть не так с этим? Мои утверждения в тестовом примере не выполняются и, кроме того, я не вижу свой журнал операторы, которые должны быть напечатаны, если бы мой метод потребления когда-либо вызывался.
У меня такое ощущение, что автоматическая конфигурация из-за @SpringBootTest
и @EmbeddedKafka
настраивает какую-то другую фабрику контейнера слушателей, и поэтому, возможно, мой @KafkaListener
аннотация неверная. Я знаю, это немного расплывчато, но не могли бы вы сказать мне, что / где посмотреть? Если я запускаю как @SpringBootApplication
, мой потребитель вытягивает сообщения из фактической очереди. Так что никаких проблем с моим фактическим приложением. Это тест это не выполняется в соответствии с ожиданиями.
Пожалуйста, помогите.
Редактировать 1: в моем файле yml установлено spring.kafka.consumer.auto-offset-reset=earliest
.