Spring Integration для чтения сообщения из темы Кафки на основе метки времени - PullRequest
0 голосов
/ 13 февраля 2019

При использовании пружины kafka Я могу читать сообщения из темы, основываясь на отметке времени, с приведенным ниже кодом -

                ConsumerRecords<String, String> records = consumer.poll(100);
                if (flag) {
                    Map<TopicPartition, Long> query = new HashMap<>();
                    query.put(new TopicPartition(kafkaTopic, 0), millisecondsFromEpochToReplay);

                    Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
                    if(result != null)
                    {
                        records = ConsumerRecords.empty();
                    }

                    result.entrySet().stream()
                            .forEach(entry -> consumer.seek(entry.getKey(), entry.getValue().offset()));

                    flag = false;
                }

Как добиться такой же функциональности с помощью интеграции пружины DSL - с KafkaMessageDrivenChannelAdapter?Как мы можем установить потоки интеграции и прочитать сообщение из темы на основе метки времени?

1 Ответ

0 голосов
/ 13 февраля 2019

Настройте контейнер получателя адаптера с ConsumerAwareRebalanceListener и выполните поиск / поиск при назначении разделов.

РЕДАКТИРОВАНИЕ

Использование Spring Boot (но выможно настроить контейнер, однако вы создаете контейнер) ...

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=so54664761

и

@SpringBootApplication
public class So54664761Application {

    public static void main(String[] args) {
        SpringApplication.run(So54664761Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.send("so54664761", "foo");
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so54664761", 1, (short) 1);
    }

    @Bean
    public IntegrationFlow flow(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
        ConcurrentMessageListenerContainer<String, String> container = container(containerFactory);
        return IntegrationFlows.from(new KafkaMessageDrivenChannelAdapter<>(container))
                .handle(System.out::println)
                .get();
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> container(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("so54664761");
        container.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

            @Override
            public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                System.out.println("Partitions assigned - do the lookup/seeks here");
            }

        });
        return container;
    }

}

и

Partitions assigned - do the lookup/seeks here
GenericMessage [payload=foo, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2f5b2297, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=so54664761, kafka_receivedTimestamp=1550241100112}]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...