Как настроить потребительскую конфигурацию Kafka для получения сообщений с этого момента? - PullRequest
0 голосов
/ 22 октября 2018

Я новичок в Kakfa и учусь создавать и потреблять сообщения в и из темы Kafka.

Я использую конфигурацию Kafka с помощью @ EnableKafka

@EnableKafka
@Configuration
public class ConsumerConfig implements ApplicationContextAware {

    @Value("${kafka.servers}")
    private String kafkaServerAddress;

    @Value("${kafka.ca.groupid}")
    private String groupId;


    private ApplicationContext context;

    public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {

        Map<String, Object> props = new HashMap<>();
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> binlogListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        DefaultKafkaConsumerFactory<String, Object> defaultFactory = consumerFactory();
        defaultFactory.setKeyDeserializer(new StringDeserializer());
        defaultFactory.setValueDeserializer(new JsonDeserializer(BinlogMessage.class));
        factory.setConsumerFactory(defaultFactory);
        return factory;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        context = applicationContext;
    }

}

1 Ответ

0 голосов
/ 22 октября 2018

Получил ответ, это можно сделать, установив для свойства AUTO_OFFSET_RESET_CONFIG самое позднее следующее:

public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {

    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    return new DefaultKafkaConsumerFactory<>(props);
}
...