В библиотеке Confluent Kafka (в данном случае версии python) есть метод продукта, который принимает функцию обратного вызова доставки:
kafka_producer.produce(topic=topic,
key=key,
value=value,
on_delivery=delivery_callback)
Этот обратный вызов вызывается независимо от того, было ли сообщение успешно доставлено или нет :
def delivery_callback(err, msg):
У меня нет какой-либо логики повторения в этой функции, если сообщение не удалось, потому что в документах говорится, что оно асинхронное.
Вместо каждых 100 сообщений или около того, яположитесь на flush()
, чтобы сообщить мне, если какие-либо сообщения не были успешно созданы:
messages_outstanding = kafka_producer.flush()
if messages_outstanding == 0:
//continue to the next batch of 100
else:
//produce the batch again
Будет ли flush()
отвечать за сообщения, которые не удалось создать?(сообщается как ошибки в delivery_callback
)
Другими словами, могу ли я быть уверен, что flush()
не вернет ноль, если какие-либо сообщения не пройдут ?