@KafkaListener Читайте с самого начала каждый раз - PullRequest
0 голосов
/ 30 ноября 2018

Я использую следующий пример, чтобы использовать потребителя Spring Kafka для чтения сообщений.Мой вариант использования требует, чтобы каждый раз, когда создавалось сообщение, слушатель читал с самого начала, каждый раз.просто потребляет сообщения, идущие вперед.

Есть ли способ заставить его искать начало каждый раз?

Ответы [ 4 ]

0 голосов
/ 17 июля 2019

Вот как я это буду реализовывать.Вам необходимо реализовать интерфейс ConsumerSeekAware и выполнить несколько реализаций для метода onPartitionsAssigned.Вы также можете сделать seekToBegining по требованию, если отправляете переменную среды при перезапуске приложения.Я еще не реализовал это!

@Service
@EnableKafka
public class Service implements ConsumerSeekAware {



    @KafkaListener(topics = "${topicName}", groupId = "${groupId}")
    public void listen(@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                       @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,
                       @Payload List<String> messageBatch
    ) {
            //do a bunch of stuff
    }



    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        String topic= Optional.ofNullable(System.getProperty(TOPIC_NAME)).orElseThrow(()->new RuntimeException("topicName needs to be set"));
        assignments.keySet().stream().filter(partition->topic.equals(partition.topic()))
                .forEach(partition -> callback.seekToBeginning(topic, partition.partition()));
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {}

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {}
}
0 голосов
/ 20 декабря 2018

Я думаю, вы должны попытаться написать ConsumerSeekAwareListener и стремиться к смещению 0 каждый раз, когда читаете сообщение.Звучит как сумасшедший обходной путь, но это может помочь.Надеюсь, это поможет вам: -)

class Listener implements ConsumerSeekAware {

 private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

   ----Override all methods that are needed----

@KafkaListener(...)
    public void listen(@Payload String message) {

            this.seekCallBack.get().seek(topic, partition, 0);
        }
    }
}
0 голосов
/ 17 января 2019

@ Nimo1981 Так что это реализация с простой Java.Я не уверен, отвечает ли это вашим потребностям.Таким образом, в основном я передаю смещение 0 (то есть, даже если я читаю из темы Кафки, я возвращаюсь к смещению, которое находится в начале). Я не уверен, рассматривали ли вы эту реализацию, но, пожалуйста, дайте мне знать, если этоэто то, что вы ищете

Не указывайте CommitCountObj.Это не нужно для вашего.Поэтому по умолчанию offsetMap будет иметь следующую запись смещения, подобную этой,

offsetMap.put (new TopicPartition (record.topic (), record.partition ()), новый OffsetAndMetadata (record.offset () + 1, «какое-то сообщение об успешной фиксации»));

, но для вашего случая использования я немного изменен, он работает хорошо, когда потребитель не перезапускается

offsetMap.put (new TopicPartition (record.topic (), record.partition ()), новый OffsetAndMetadata (0, «фиксация не выполнена»));

public class KafkaConsumerClass {

    private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(KafkaConsumerClass.class);
    private CommitCountClass commitCountobj = new CommitCountClass();

    public Consumer<String, List<FeedBackConsumerClass>> createConsumer() {
        Map<String, Object> consumerProps = new HashMap<String, Object>();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:7070,localhost:7072");
        consumerProps.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 50000);
        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "first_group-client1");
        // consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "first_group");
        // consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaConsumerInterceptor.class);
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1500);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return new KafkaConsumer<String, List<FeedBackConsumerClass>>(consumerProps);
    }

    public void consumeRecord() {
        log.info("Coming inside consumer consumer");
        ArrayList<String> topicList = new ArrayList<String>();
        topicList.add("topic1");
        commitCountobj.setCount(0);
        Consumer<String, List<FeedBackConsumerClass>> kafkaConsumer = createConsumer();
        kafkaConsumer.subscribe(topicList);
        log.info("after subscribing");

        Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();

        while (true) {

            ConsumerRecords<String, List<FeedBackConsumerClass>> recordList = kafkaConsumer.poll(Long.MAX_VALUE);
            // kafkaConsumer.seekToBeginning(kafkaConsumer.assignment());

            log.info("Inside while loop:" + recordList);
            if (!recordList.isEmpty()) {
                recordList.forEach(record -> {
                    int i = 0;
                    System.out.println(record.toString());
                    // we can make the call to the API here
                    // call the db here or any API and process the record
                    // then call the code to commit
                    // since the commit is switched off, it becomes a developers responsibility to do the auto commit
                    offsetMap.put(new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(0, "no metadata/offset commited"));
                    // here we are incrementing the offsetMap so that we are making sure we are storing the
                    // next set of offsets in the map
                    if (commitCountobj.getCount() % 1000 == 0) {
                        kafkaConsumer.commitAsync(offsetMap, new OffsetCommitCallback() {

                            @Override
                            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
                                    Exception exception) {
                                // TODO Auto-generated method stub
                                if (exception != null) {
                                    // retry it now with a sync
                                    // possibility of error occuring here as well
                                    // so capture the exception and exit the consumer gracefully
                                    kafkaConsumer.commitSync();
                                    log.error(exception.getMessage());
                                }
                            }
                        });
                    }
                    commitCountobj.setCount(i++);
                });
            }

        }
    }

}
0 голосов
/ 01 декабря 2018

использовать сжатую тему с 1 разделом для хранения списка конфигураций.Затем он должен вызываться конечной точкой покоя, и он должен отображать полный уникальный список конфигураций

Способ, которым вы должны реализовать это, - использовать Kafka Streams, KTable и рассчитывать интерактивных запросов за слоем REST.Не стандартный потребитель, который должен перематывать себя, чтобы получить наиболее обновленное состояние системы.

Пример этого уже существует в инфраструктуре Kafka Connect, где у него есть тема конфигурации, и вы можете получить доступ только к самому последнему значению GET /connectors/name/config, и только если вы перезапустите его или масштабируете для большего количества экземпляров.Будет ли он потреблять все сообщения снова.Реестр схем также является примером этого и хранит внутреннюю таблицу Hashmap всех схем в теме _schemas и имеет REST API для чтения, вставки, удаления

По существу, когда вы получаете новыйВ конфигурации для данного ключа вы можете либо «заменить» старое значение для данного ключа на совершенно новое, либо каким-либо образом «объединить» старое значение с новыми данными.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...