Неподтвержденное сообщение больше не используется - PullRequest
0 голосов
/ 22 мая 2018

Я использую Kafka слушатель сообщений потребителей.Я включил подтверждение вручную.Я использовал метод Acknowledgment.acknowledge () для фиксации смещения в случае успеха.В случае исключения из-за сбоя вызова API, я не зафиксировал его.

Я немного сбит с толку, поскольку незафиксированное сообщение не будет снова использоваться, пока я не перезапущу свое приложение.Я новичок в kafka, поэтому я уверен, что упускаю что-то действительно элементарное, но поиск был бесполезен для этого.Может кто-нибудь объяснить или указать мне правильный источник, где я могу знать, что здесь не так?

 @KafkaListener(topics = "temp_topic", groupId = "temp-consumer", containerFactory = "tempListenerContainerFactory")
public void receive(ConsumerRecord<?, ?> consumerRecord,
                                   Acknowledgment acknowledgment) {
    try {
        log.info("Consuming message : {} ", consumerRecord.value().toString());
        String message = consumerRecord.value().toString();
         objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
         Payload payload = objectMapper.readValue(message,Payload.class);
         tempService.processEvent(payload);
         log.info("Message: {} successfully consumed",message);
         acknowledgment.acknowledge();
    } catch (Exception e) {
        log.error("Error while listening to kafka event: ", e);
    }
}

Заранее спасибо.

Ответы [ 2 ]

0 голосов
/ 23 мая 2018

Я немного прочитал об этом и нашел способ справиться с ошибочными сообщениями.Мы делаем 3 темы

  1. Основная тема
  2. Дополнительная тема
  3. Мертвая тема

Идея состоит в том, чтобы использовать сообщение из основной темы, и, если по какой-либо причине оно терпит неудачу, переместить его в побочную тему, где оно будет повторяться n количество раз в t временные интервалы.

Если произойдет сбой после всех повторных попыток в боковой очереди, переместите его в мертвую тему, где он просто уведомит (используя электронную почту или любую другую службу) о сбое.

Я не уверен, является ли это стандартом, но это имело смысл из всего, что я прочитал до сих пор.

Пожалуйста, не стесняйтесь добавлять или исправлять это, если можете.

0 голосов
/ 23 мая 2018

Подтверждение не меняет точку, в которой ваш потребитель фактически читает.
Он только фиксирует смещение в некотором хранилище (тема zookeeper или kafka со смещениями) и не вносит никаких изменений в смещение, отслеживаемое потребителем - например,вы не можете зафиксировать текущее смещение и переопределить его, сделав ack для какого-то следующего смещения.

Есть несколько способов, которыми вы можете попытаться справиться с ошибочными сообщениями:

  1. Вы можете бытьподготовлен к конкретным типам сбоев (например, тайм-аут, ошибка десериализации) - перехватите их внутри слушателя и предпримите правильные действия (например, подождите / повторите попытку или завершите работу приложения - в зависимости от характера проблемы).
  2. Вы можете пропустить сбойсообщения - ловите ошибки внутри слушателя, помещайте их в хранилище для сообщений об ошибках (см. Очередь мертвых писем) и затем проверяйте их.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...