@KafkaListener не получает записи - PullRequest
0 голосов
/ 16 октября 2018

Не могу определить, что не так с моей конфигурацией слушателя kafka.Изначально у меня есть непустая тема кафки с именем "транзакции" с несколькими записями (я вижу это в KafkaTool).Это application.yml:

spring:
  ###
  #   Kafka Settings
  ###
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      key-deserializer: com.panbet.externalbet.history.report.support.ReportsBetKeyJsonDeserializer
      value-deserializer: com.panbet.externalbet.history.report.support.ReportsBetJsonDeserializer
      group-id: external.history.group

Вот файл конфигурации Java:

@EnableKafka
@Configuration
public class KafkaConfig
{
    private final KafkaProperties properties;


    public KafkaConfig(KafkaProperties properties)
    {
        this.properties = properties;
    }


    @Bean
    public ConsumerFactory<ReportsBetKeyDto, ReportsBetDto> kafkaConsumerFactory()
    {
        return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties());
    }


    @Bean
    public ConcurrentKafkaListenerContainerFactory<ReportsBetKeyDto, ReportsBetDto> kafkaListenerContainerFactory()
    {
        ConcurrentKafkaListenerContainerFactory<ReportsBetKeyDto, ReportsBetDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        return factory;
    }
}

Мой слушатель kafka выглядит так (слушатель темы "транзакции", которая упоминалась ранее):

@Component
public class ReportsConsumer
{
    @KafkaListener(topics = { "transactions" })
    public void listen(ConsumerRecord<ReportsBetKeyDto, ReportsBetDto> record)
    {
        System.out.println(record);
    }
}

Я ожидаю: когда приложение запустится, я поймаю отладку внутри метода ReportsConsumer.listen.Но, к сожалению, ничего не происходит.Слушал не подключается к теме Кафка.Что может быть не так?Спасибо.

1 Ответ

0 голосов
/ 16 октября 2018

По умолчанию новые потребители начинают потреблять с конца темы.

Установить spring.kafka.consumer.auto-offset-reset=earliest

Я также рекомендую spring.kafka.consumer.enable-auto-commit=false, чтобы контейнер управлял смещениями.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...