В настоящее время я реализовал приемник kafka, который работает следующим образом:
Внутри цикла while:
- Использование сообщения от kafka
- Поместить использованное сообщение в отдельный файл.задача для обработки, чтобы основной поток и цикл потребителя не блокировались. 2.1. Передать сообщение только в случае успешной обработки или превышения числа попыток обработки.
Шаг # 2.1 может занять от 1 секундадо 6 часов до завершения .
Проблема заключается в том, что в случае сбоя приложения и наличия задач, которые не были выполнены, при перезапуске приложения (или даже при перебалансировке) эти сообщения будут использоваться и обрабатываться снова.
Я не хочу автоматически фиксировать смещения, потому что это гарантирует доставку только один раз.Я думал об использовании базы данных в качестве хранилища для состояний сообщений и реализации потребителя следующим образом:
Внутри цикла while:
- Использование сообщения от kafka
Проверка БД, если такое сообщение существует
Если сообщение существует в БД и состояние «завершено», то зафиксировать сообщение
Если сообщение существуетв дБ, но состояние «в процессе», затем перейдите к шагу № 4 напрямую
- Если сообщение не существует, то перейдите к шагу # 3
Сохранить сообщение в базе данных с состоянием «в процессе»
- Поместить использованное сообщение в отдельную задачу для обработки, чтобы основной поток и цикл потребителя не блокировались. 4.1. Передать сообщение и изменить состояние вдБ до «выполнено» только при успешной обработке или превышении числа попыток обработки.
Я не уверен, что использование дБ - правильный подход, потому что, если у меня много сообщений, он замедлитсявниз потребителя.Можете ли вы дать мне какие-либо предложения о том, как правильно реализовать потребителя, чтобы каждое сообщение обрабатывалось только один раз?