Я пытаюсь создать асинхронного производителя Kafka, который может быть вызван несколькими потоками одновременно.
Все они публикуют данные в одной и той же теме с произвольными интервалами, вплоть до 1000 производителей, и ни один из потоков не должен блокировать другие потоки.
Я прочитал, что предпочтительный способ сделать это - создать объект-производитель и передать его всем используемым объектам потока, поскольку Kafka является потокобезопасным и асинхронным.
Мой вопрос заключается в том, что мне делать в реализации моего производителя, чтобы гарантировать, что различные потоки не блокируют другие потоки и что данные из каждого потока отправляются (учитывая, что брокер подключен к сети).
Я начал использовать части примера буферизованного производителя из https://github.com/mfontanini/cppkafka/tree/master/examples
Я инициализировал буферизованного производителя как отдельный класс и передал ссылку на этот объект во все другие классы, чтобы они могли использовать их в своих методах, вызываемых потоками.
Итак, после инициализации производителя перед запуском потоков я изначально планировал делать это в каждой функции потока:
builder.payload(payload);
producer.add_message(builder);
producer.flush();
Затем я прочитал, что метод flush () блокирует и делает продюсера синхронным, и вместо этого вы должны использовать yield (),
builder.payload(payload);
producer.produce(builder);
, или просто метод add_message ()
builder.payload(payload);
producer.add_message(builder);
Эти данные затем будут отправлены в зависимости от моей настройки queue.buffering.max.ms.
Но что произойдет, когда буфер будет окончательно очищен, приведет ли это к блокированию любого из моих производителей или это произойдет автоматически в фоновом потоке?
И если я установлю queue.buffering.max.ms = 0, это будет так же, как если бы я поместил flush () там, или это все еще асинхронный и неблокирующий?
И если я захочу зарегистрировать данные в обратных вызовах неудачи / успеха производителя, вызовут ли эти обратные вызовы какой-либо поток производителя для блокировки?