Интерфейс dr_cb
устарел и устарел, вместо него следует использовать более богатый dr_msg_cb
, где у вас есть доступ ко всем полям rd_kafka_message_t
, для вашего случая rkmessage->rkt
для топи c.
Но, как правило, вам не следует повторять попытку создания сообщения о сбое в отчете о доставке, поскольку librdkafka сделает все возможное, чтобы создать сообщение в настроенных ограничениях (message.timeout.ms
и retries
), там не так много, что приложение может добавить, повторив попытку того же сообщения.
Вместо этого настройте message.timeout.ms
для удовлетворения потребностей вашего бизнеса, отвечая на вопрос «Как долго имеет смысл пытаться создать этот фрагмент данных? ? "и установите для retries
его максимальное значение (поскольку число повторных попыток не имеет значения с точки зрения приложения).
Если дубликаты или порядок сообщений имеют решающее значение, вам также следует рассмотреть возможность использования идемпотентного производителя (enable.idempotence=true
).
И, наконец, librdkafka предоставляет API индикатора устойчивости сообщений (rd_kafka_message_status()
) для каждого гастронома. проверенное или сбойное сообщение, сообщающее приложению, было ли сообщение ..:
- определенно не сохранено
- возможно, сохранено (повторная попытка вручную может привести к дублированию)
- определенно сохраняется
См. https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#message -надежность для получения дополнительной информации о надежности сообщений в librdkafka.