Прежде всего, я должен сказать, что я не Кафка и не эксперт по Spring, но я думаю, что это более концептуальная задача при записи в независимые ресурсы, и решение должно быть адаптировано к вашему технологическому стеку. Кроме того, я должен сказать, что это решение пытается решить проблему без внешнего компонента, такого как Debezium, потому что, по моему мнению, каждый дополнительный компонент создает проблемы при тестировании, обслуживании и запуске приложения, которые часто недооцениваются при выборе такого варианта. Также не каждая база данных может быть использована в качестве источника Debezium.
Чтобы убедиться, что мы говорим об одних и тех же целях, давайте проясним ситуацию на упрощенном примере авиакомпании, где клиенты могут покупать билеты. После успешного заказа клиент получит сообщение (почта, push-уведомление и т. Д.), Отправленное внешней системой обмена сообщениями (системой, с которой мы должны поговорить).
В традиционном мире JMS с XA-транзакцией между нашей базой данных (где мы храним заказы) и JMS-провайдером это будет выглядеть следующим образом: Клиент устанавливает заказ для нашего приложения, где мы запускаем транзакцию. Приложение сохраняет заказ в своей базе данных. Затем сообщение отправляется в JMS, и вы можете совершить транзакцию. Обе операции участвуют в транзакции, даже когда они разговаривают со своими собственными ресурсами. Поскольку транзакция XA гарантирует ACID, мы в порядке.
Давайте добавим в игру Кафку (или любой другой ресурс, который не может участвовать в транзакции XA). Поскольку больше не существует координатора, который синхронизирует обе транзакции, основной идеей следующего является разделение обработки на две части с постоянным состоянием.
Когда вы сохраняете заказ в своей базе данных, вы также можете сохранить сообщение (с агрегированными данными) в той же базе данных (например, как JSON в CLOB-столбце), которую впоследствии вы хотите отправить в Kafka. Тот же ресурс - ACID гарантированно, пока все хорошо. Теперь вам нужен механизм, который опрашивает вашу «KafkaTasks» -Table для новых задач, которые должны быть отправлены в Kafka-Topic (например, с помощью службы таймера, возможно, аннотация @Scheduled может использоваться в Spring). После того, как сообщение было успешно отправлено в Kafka, вы можете удалить запись задачи. Это гарантирует, что сообщение для Kafka отправляется только тогда, когда заказ также успешно сохраняется в базе данных приложения. Получили ли мы те же гарантии, что и при использовании транзакции XA? К сожалению, нет, так как все еще существует вероятность того, что запись в Kafka работает, но удалить задачу не удается. В этом случае механизм повтора (вам понадобится тот, который указан в вашем вопросе) обработает задачу и отправит сообщение дважды. Если ваше экономическое обоснование удовлетворено этой «хотя бы раз» гарантией, вы сделали здесь полусложное решение imho, которое может быть легко реализовано как функциональность фреймворка, так что не всем придется беспокоиться о деталях.
Если вам нужен «ровно один раз», вы не можете сохранить свое состояние в базе данных приложения (в данном случае «удаление задачи» является «состоянием»), но вместо этого вы должны сохранить его в Kafka (при условии, что у вас есть ACID гарантирует между двумя темами Кафки). Пример: допустим, у вас есть 100 задач в таблице (идентификаторы от 1 до 100), и задание обрабатывает первые 10. Вы записываете свои сообщения Kafka в их тему, а другое сообщение с идентификатором 10 - в «вашу тему». Все в той же Кафке-транзакции. В следующем цикле вы потребляете свою тему (значение 10) и принимаете это значение, чтобы получить следующие 10 задач (и удалить уже обработанные задачи).
Если есть более простые (в приложении) решения с такими же гарантиями, я с нетерпением жду вашего ответа!
Извините за длинный ответ, но я надеюсь, что это поможет.