Что является хорошим выбором для очереди мертвых писем у потребителя кафки - PullRequest
0 голосов
/ 05 октября 2019

В настоящее время я пишу kafka cosumer. Работа потребителя заключается прежде всего в создании нескольких сущностей БД и их сохранении после обработки полезной нагрузки. Я пытаюсь написать код для обработки ошибок, которые могут возникнуть при использовании данных. Для этого я могу придумать 2 варианта (в весенней эко-системе)

  1. Отправить сообщение об ошибке в dead-letter-kafka-topic
  2. Отправить сообщение об ошибке новомуТаблица БД (Error-table)

Неудачные сообщения необходимо снова обработать.

В случае 1: Снова мне нужно написать еще один @KafkaListner, который слушает тупик-тему и обрабатывает сообщение. Здесь проблема в том, что я не могу лучше контролировать процесс повторной обработки. (Как планировщик) Поскольку KafkaListener начнет обрабатывать данные, как только данные будут опубликованы в теме недоставленных сообщений.

В случае 2: ​​ У меня есть ещеконтроль над процессом повторной обработки, так как я могу написать конечную точку REST или планировщик, который попытается повторно обработать ошибочные сообщения. (Здесь у меня дилемма, какую БД использовать. Реляционное ИЛИ какое-то хранилище значений ключей)

В основном у меня возникает проектная дилемма, и я не могу определить, какой подход лучше в SpringЭкосистема.

Оцените ответ.

Ответы [ 2 ]

0 голосов
/ 05 октября 2019

Я согласен с ответом Гари Рассела, вы можете создать KafkaConsumer экземпляр и контролировать его жизненный цикл. Класс происходит из библиотеки org.apache.kafka:kafka-clients.

В вашем конкретном случае вы можете добавить Thread.sleep(schedulerDelay) для достижения планирования. Вот упрощенный пример:

@Component
class Scheduler() {

  public void init() {
    // create kafka consumer connected to your DLQ topic
  }

  public void run() {
    try {
      while (running) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (ConsumerRecord<String, String> record : records)
          processRecordLogicGoesHere(record);
        Thread.sleep(schedulerDelay);
      }
    } finally {
      consumer.close();
    }
  }

}

Необходимо тщательно подбирать schedulerDelay, чтобы не отставать от входящих сообщений и не допускать их потери политикой очистки журнала Kafka.

ТамЕсть множество учебных пособий по работе с официальным API Kafka, вот один из них: Знакомство с Kafka Consumer

Кроме того, вы можете найти здесь несколько идей: Повторная попытка потребителяархитектура в Apache Kafka

0 голосов
/ 05 октября 2019

Я думаю, что использование Kafka - лучшее решение.

Поскольку KafkaListener начнет обрабатывать данные, как только они будут опубликованы в теме недоставленных сообщений.

Вы можете контролировать поведение, установив для autoStartup значение false для этого прослушивателя, а затем запустить / остановить прослушиватель, используя KafkaListenerEndpointRegistry при необходимости:

registry.getListenerContainer (myListenerId) .start ();

Или вы можете использовать свой собственный KafkaConsumer (созданный фабрикой потребителей) и опрашивать столько записей, сколько хотите, и закрывать потребителя, когда закончите.

...