Не могу определить, что не так с моей конфигурацией слушателя kafka.Изначально у меня есть непустая тема кафки с именем "транзакции" с несколькими записями (я вижу это в KafkaTool).Это application.yml:
spring:
###
# Kafka Settings
###
kafka:
consumer:
bootstrap-servers: localhost:9092
key-deserializer: com.panbet.externalbet.history.report.support.ReportsBetKeyJsonDeserializer
value-deserializer: com.panbet.externalbet.history.report.support.ReportsBetJsonDeserializer
group-id: external.history.group
Вот файл конфигурации Java:
@EnableKafka
@Configuration
public class KafkaConfig
{
private final KafkaProperties properties;
public KafkaConfig(KafkaProperties properties)
{
this.properties = properties;
}
@Bean
public ConsumerFactory<ReportsBetKeyDto, ReportsBetDto> kafkaConsumerFactory()
{
return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<ReportsBetKeyDto, ReportsBetDto> kafkaListenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory<ReportsBetKeyDto, ReportsBetDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
return factory;
}
}
Мой слушатель kafka выглядит так (слушатель темы "транзакции", которая упоминалась ранее):
@Component
public class ReportsConsumer
{
@KafkaListener(topics = { "transactions" })
public void listen(ConsumerRecord<ReportsBetKeyDto, ReportsBetDto> record)
{
System.out.println(record);
}
}
Я ожидаю: когда приложение запустится, я поймаю отладку внутри метода ReportsConsumer.listen.Но, к сожалению, ничего не происходит.Слушал не подключается к теме Кафка.Что может быть не так?Спасибо.