Повторное использование сообщений из темы сжатия журнала kafka - PullRequest
1 голос
/ 08 ноября 2019

У меня есть приложение Spring с потребителем Kafka, использующее аннотацию @KafkaListerner. Используемая тема сжата в журнале, и у нас может быть сценарий, в котором мы должны снова использовать сообщения темы. Каков наилучший способ достичь этого программно? Мы не контролируем настройку темы Kafka.

1 Ответ

2 голосов
/ 08 ноября 2019
    @KafkaListener(...)
    public void listen(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
        System.out.println(in);
        if (this.resetNeeded) {
            consumer.seekToBeginning(consumer.assignment());
            this.resetNeeded = false;
        }
    }

Если вы хотите выполнить сброс, когда слушатель находится в режиме ожидания (нет записей), вы можете включить события в режиме ожидания и выполнить поиск, прослушивая ListenerContainerIdleEvent в методе ApplicationListener или @EventListener.

Событие имеет ссылку на потребителя.

РЕДАКТИРОВАТЬ

@SpringBootApplication
public class So58769796Application {

    public static void main(String[] args) {
        SpringApplication.run(So58769796Application.class, args);
    }

    @KafkaListener(id = "so58769796", topics = "so58769796")
    public void listen1(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        System.out.println("One:" + key + ":" + value);
    }

    @KafkaListener(id = "so58769796a", topics = "so58769796")
    public void listen2(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        System.out.println("Two:" + key + ":" + value);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so58769796")
                .compact()
                .partitions(1)
                .replicas(1)
                .build();
    }

    boolean reset;

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so58769796", "foo", "bar");
            System.out.println("Hit enter to rewind");
            System.in.read();
            this.reset = true;
        };
    }

    @EventListener
    public void listen(ListenerContainerIdleEvent event) {
        System.out.println(event);
        if (this.reset && event.getListenerId().startsWith("so58769796-")) {
            event.getConsumer().seekToBeginning(event.getConsumer().assignment());
        }
    }

}

и

spring.kafka.listener.idle-event-interval=5000

РЕДАКТИРОВАНИЕ2

Вот еще один метод - в этом случае мы перематываем каждый раз при запуске приложения (и по требованию) ...

@SpringBootApplication
public class So58769796Application implements ConsumerSeekAware {

    public static void main(String[] args) {
        SpringApplication.run(So58769796Application.class, args);
    }

    @KafkaListener(id = "so58769796", topics = "so58769796")
    public void listen(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        System.out.println(key + ":" + value);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so58769796")
                .compact()
                .partitions(1)
                .replicas(1)
                .build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template,
            KafkaListenerEndpointRegistry registry) {

        return args -> {
            template.send("so58769796", "foo", "bar");
            System.out.println("Hit enter to rewind");
            System.in.read();
            registry.getListenerContainer("so58769796").stop();
            registry.getListenerContainer("so58769796").start();
        };

    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.keySet().forEach(tp -> callback.seekToBeginning(tp.topic(), tp.partition()));
    }

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...