Потребитель Kafka не получает сообщение в Spring Boot - PullRequest
1 голос
/ 10 апреля 2019

Мой потребитель Spring / Java не может получить доступ к сообщению, созданному производителем. Однако, когда я запускаю потребителя из консоли / терминала, он может получить сообщение, созданное производителем spring / java.

Конфигурация потребителя:

@Component
@ConfigurationProperties(prefix="kafka.consumer")
public class KafkaConsumerProperties {

    private String bootstrap;
    private String group;
    private String topic;

    public String getBootstrap() {
        return bootstrap;
    }

    public void setBootstrap(String bootstrap) {
        this.bootstrap = bootstrap;
    }

    public String getGroup() {
        return group;
    }

    public void setGroup(String group) {
        this.group = group;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }
}

Конфигурация слушателя:

@Configuration
@EnableKafka
public class KafkaListenerConfig {

    @Autowired
    private KafkaConsumerProperties kafkaConsumerProperties;

    @Bean
    public Map<String, Object> getConsumerProperties() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperties.getBootstrap());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroup());
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        return properties;
    }

    @Bean
    public Deserializer stringKeyDeserializer() {
        return new StringDeserializer();
    }

    @Bean
    public Deserializer transactionJsonValueDeserializer() {
        return new JsonDeserializer(Transaction.class);
    }

    @Bean
    public ConsumerFactory<String, Transaction> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(getConsumerProperties(), stringKeyDeserializer(), transactionJsonValueDeserializer());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Transaction> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Transaction> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(1);
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

Слушатель Кафки:

@Service
public class TransactionConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(Transaction.class);

    @KafkaListener(topics={"transactions"}, containerFactory = "kafkaListenerContainerFactory")
    public void onReceive(Transaction transaction) {
        LOGGER.info("transaction = {}",transaction);
    }
}

Потребительское приложение:

@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

}

ТЕСТ-СЛУЧАЙ 1: ПРОЙДЕН Я начал свой весенний / Java-продюсер и запускаю потребителя с консоли. Когда я создаю формирователь сообщения, мой консольный потребитель может получить доступ к сообщению.

ТЕСТ, СЛУЧАЙ 2: СБОЙ Я запустил свой Spring / Java-клиент и запустил его с консоли. Когда я создаю консоль производителя сообщений, мой потребитель Spring / Java не может получить доступ к сообщению.

ТЕСТ, СЛУЧАЙ 3: СБОЙ Я начал свой потребитель весны / Java и управляю производителем весны / Java. Когда я создаю сообщение из источника Spring / Java, мой потребитель Spring / Java не может получить доступ к сообщению.

Вопрос

  1. Что-то не так в моем коде пользователя?

  2. Мне не хватает какой-либо конфигурации для моего слушателя kafka?

  3. Нужно ли явно запускать прослушиватель? (Я так не думаю, так как вижу в журнале терминала соединение с темой, но я не уверен)

1 Ответ

0 голосов
/ 11 апреля 2019

Хорошо, вы пропали без вести AUTO_OFFSET_RESET_CONFIG в Consumer Configs

properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

auto.offset.reset

Что делать, когда нетначальное смещение в Кафке или если текущее смещение больше не существует на сервере (например, потому что эти данные были удалены):

самое раннее: автоматически сбрасывает смещение насамое раннее смещение

самое последнее: автоматически сбрасывает смещение на самое последнее смещение

нет: вызывает исключение для потребителя, если не найдено предыдущее смещение длягруппа потребителя

что-либо еще: сгенерирует исключение для потребителя

Примечание: auto.offset.reset до earliest будет работать, только если kafka делаетне иметь смещения для этой группы потребителей (поэтому в вашем случае необходимо добавить это свойство с новой группой потребителей и перезапустить приложение)

...