Spring Kafka - потребляет последние N сообщений для разделов для любой темы - PullRequest
4 голосов
/ 11 октября 2019

Я пытаюсь прочитать запрошенный номер сообщения kafka. Для нетранзакционных сообщений мы будем искать из endoffset - N для M разделов начинать опрос и собирать сообщения, где текущее смещение меньше конечного смещения для каждого раздела. Для идемпотентных / транзакционных сообщений мы должны учитывать маркеры транзакций / дубликаты сообщений, и значения смещений не будут непрерывными, в этом случае endoffset - N не будет возвращать N сообщений, и нам нужно будет вернуться и искать больше сообщений, пока у нас не будет N сообщенийдля каждого раздела или начальное смещение достигнуто

Поскольку существует несколько разделов, мне необходимо отслеживать все считанные смещения, чтобы я мог остановиться, когда все будет сделано. Существует два шага: первый шаг для вычисления начального смещения (конечное смещение - запрошенный номер сообщений) и конечное смещение. (смещения не являются непрерывными, есть пропуски), и я бы искал раздел для начала потребления с начального смещения. Второй шаг заключается в опросе сообщений и подсчете сообщений в каждом разделе, и если мы не встретим запрошенное количество сообщений, повторите первый и второй шаг снова, пока не встретим количество сообщений для каждого раздела.

Условия

Первоначальный опрос может не вернуть никаких записей, поэтому продолжайте опрос. Прекратите опрос, когда вы достигли конечного смещения для каждого раздела, или опрос не даст результатов. Проверьте каждый раздел на наличие сообщений, прочитанных так же, как и запрошенные сообщенияЕсли да, отметьте как выполненное, если нет, отметьте как продолжить и повторите шаги. Учитывайте пробелы в сообщениях. Должен работать как для транзакционного, так и для нетранзакционного производителя.

Вопрос:

Как мне отследить, чтобы все сообщения были прочитаны для каждого раздела и вышли из цикла? Сообщения в каждом разделе будут приходить по порядку, если это будет полезно.

Поддерживает ли Spring Kafka такой вариант использования? Более подробную информацию можно найти здесь

Обновление : я прошу прочитать последние N сообщений в каждом разделе. Разделы и нет сообщений является пользовательский ввод. Я хотел бы сохранить все управление смещением в памяти. По сути, мы пытаемся читать сообщения в порядке LIFO. Это усложняет задачу, поскольку Kafka позволяет читать вперед, а не назад.

Ответы [ 2 ]

0 голосов
/ 16 октября 2019

Так что, если я вас правильно понимаю, это должно быть выполнимо со стандартной кафкой Consumer.

Consumer<?, Message> consumer = ...

public Map<Integer, List<Message>> readLatestFromPartitions(String topic, Collection<Integer> partitions, int count) {

    // create the TopicPartitions we want to read
    List<TopicPartition> tps = partitions.stream().map(p -> new TopicPartition(topic, p)).collect(toList());
    consumer.assign(tps);

    // create and initialize the result map
    Map<Integer, List<Message>> result = new HashMap<>();
    for (Integer i : partitions) { result.add(new ArrayList<>()); }

    // read until the expected count has been read for all partitions
    while (result.valueSet().stream().findAny(l -> l.size() < count)) {
        // read until the end of the topic
        ConsumerRecords<?, Message> records = consumer.poll(Duration.ofSeconds(5));
        while (records.count() > 0) {
            Iterator<ConsumerRecord<?, Message>> recordIterator = records.iterator();
            while (recordIterator.hasNext()) {
                ConsumerRecord<?, Message> record = recordIterator.next();
                List<Message> addTo = result.get(record.partition);
                // only allow 10 entries per partition
                if (addTo.size() >= count) {
                    addTo.remove(0);
                }
                addTo.add(record.value);
            }
            records = consumer.poll(Duration.ofSeconds(5));
        }
        // now we have read the whole topic for the given partitions.
        // if all lists contain the expected count, the loop will finish;
        // otherwise it will wait for more data to arrive.
    }

    // the map now contains the messages in the order they were sent,
    // we want them reversed (LIFO)
    Map<Integer, List<Message>> returnValue = new HashMap<>();
    result.forEach((k, v) -> returnValue.put(k, Collections.reverse(v)));
    return returnValue;
}
0 голосов
/ 15 октября 2019

Почему такая необходимость, я не понимаю. Кафка сама управляет, когда в очереди ничего нет. Если сообщения переходят из одного состояния в другое, у них могут быть отдельные очереди / темы. Однако вот как это можно сделать.

Когда мы получаем сообщения из раздела, используя что-то вроде -

ConsumerIterator<byte[], byte[]> it = something; //initialize consumer
while (it.hasNext()) {
  MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
  String kafkaMessage = new String(messageAndMetadata.message());
  int partition = messageAndMetadata.partition();
  long offset = messageAndMetadata.offset();
  boolean processed = false;
  do{
    long maxOffset = something; //fetch from db
    //if offset<maxOffset, then process messages and manual commit
    //else busy wait or something more useful
  }while(processed);
}

Мы получаем информацию о смещениях, номере раздела и самом сообщении. Вы можете сделать что-нибудь с этой информацией.

Для вашего случая использования вы можете также решить сохранить смещенные смещения в базе данных, чтобы в следующий раз можно было корректировать смещения. Кроме того, я бы порекомендовал отключить подключение для очистки и окончательного сохранения обработанных смещений в БД.

...