Повторно использовать сообщения, для которых смещение не было зафиксировано - PullRequest
0 голосов
/ 26 июня 2018

У меня есть пользовательский потребитель Kafka, в котором я использую для отправки некоторых запросов REST API. Согласно ответу API, я либо фиксирую смещение, либо пропускаю сообщение без фиксации.

Минимальный пример:

while (true) {

    ConsumerRecords<String, Object> records = consumer.poll(200);
    for (ConsumerRecord<String, Object> record : records) {

        // Sending a POST request and retrieving the answer
        // ...

        if (responseCode.startsWith("2")) {
            try { 
               consumer.commitSync();
            } catch(CommitFailedException ex) {
              ex.printStackTrace(); 
            }
        } else {
              // Do Nothing
        }
    }
}

Теперь, когда ответ от REST API не начинается с 2, смещение не фиксируется, но сообщение повторно не используется. Как я могу заставить потребителя повторно потреблять сообщения с незафиксированными смещениями?

Ответы [ 2 ]

0 голосов
/ 27 июня 2018

Убедитесь, что ваши данные являются идемпотентными, если вы планируете использовать seek (). Поскольку вы выборочно фиксируете смещения, пропущенные записи, возможно, будут перед зафиксированными (успешно обработанными) записями. Если вы выполняете функцию seek (), которая перемещает указатель вашего groupId на незафиксированное смещение и запускает воспроизведение, вы также получите эти успешно обработанные сообщения. Он также может стать бесконечным циклом.

В качестве альтернативы, вы можете сохранить метаданные неуспешной записи в памяти или в базе данных и воспроизвести тему, начиная с «poll (retention.ms)», чтобы все записи воспроизводились обратно, но добавлялся фильтр для обработки только тех через API, чьи метаданные совпадают с что ты сохранил ранее. Делайте это как пакетную обработку один раз в час или несколько часов.

0 голосов
/ 26 июня 2018

Фиксация смещений - это просто способ сохранить текущее смещение, также известное как позиция, получателя. Поэтому, в случае остановки, он (или новый пользовательский экземпляр вступает во владение) может найти свою предыдущую позицию и перезапустить потребление оттуда.

Таким образом, даже если вы не делаете коммит, позиция потребителя перемещается, как только вы получаете записи. Если вы хотите пересмотреть некоторые записи, вы должны изменить текущую позицию потребителя.

С помощью клиента Java вы можете установить позицию, используя seek().

В вашем сценарии вы, вероятно, хотите рассчитать новую позицию относительно текущей позиции. Если это так, вы можете найти текущую позицию, используя position().

...