Предотвратите Confluent Kafka от потери сообщений при создании - PullRequest
0 голосов
/ 13 декабря 2018

В библиотеке Confluent Kafka (в данном случае версии python) есть метод продукта, который принимает функцию обратного вызова доставки:

kafka_producer.produce(topic=topic,
                            key=key,
                            value=value,
                            on_delivery=delivery_callback)

Этот обратный вызов вызывается независимо от того, было ли сообщение успешно доставлено или нет :

def delivery_callback(err, msg):

У меня нет какой-либо логики повторения в этой функции, если сообщение не удалось, потому что в документах говорится, что оно асинхронное.

Вместо каждых 100 сообщений или около того, яположитесь на flush(), чтобы сообщить мне, если какие-либо сообщения не были успешно созданы:

messages_outstanding = kafka_producer.flush()
if messages_outstanding == 0:
   //continue to the next batch of 100
else:
   //produce the batch again

Будет ли flush() отвечать за сообщения, которые не удалось создать?(сообщается как ошибки в delivery_callback)

Другими словами, могу ли я быть уверен, что flush() не вернет ноль, если какие-либо сообщения не пройдут ?

1 Ответ

0 голосов
/ 28 декабря 2018

Подтверждены следующие результаты:

Вызов .flush() может определенно вернуть ноль, даже если сообщения не были получены.Похоже, что этот метод ожидает завершения всех обратных вызовов доставки для всех сообщений (обратный вызов может просто сообщить, что сообщение не доставлено).

С нашей точки зрения, все это на удивление неловко.Если вы не можете позволить себе потерять сообщения, вам необходимо определить, когда произошел сбой обратного вызова при доставке, и реализовать некоторую логику повторных попыток, чтобы покрыть ошибочные сообщения.

...