Как правильно настроить неблокирующий асинхронный производитель Kafka? - PullRequest
0 голосов
/ 09 июля 2019

Я пытаюсь создать асинхронного производителя Kafka, который может быть вызван несколькими потоками одновременно. Все они публикуют данные в одной и той же теме с произвольными интервалами, вплоть до 1000 производителей, и ни один из потоков не должен блокировать другие потоки.

Я прочитал, что предпочтительный способ сделать это - создать объект-производитель и передать его всем используемым объектам потока, поскольку Kafka является потокобезопасным и асинхронным.

Мой вопрос заключается в том, что мне делать в реализации моего производителя, чтобы гарантировать, что различные потоки не блокируют другие потоки и что данные из каждого потока отправляются (учитывая, что брокер подключен к сети).

Я начал использовать части примера буферизованного производителя из https://github.com/mfontanini/cppkafka/tree/master/examples

Я инициализировал буферизованного производителя как отдельный класс и передал ссылку на этот объект во все другие классы, чтобы они могли использовать их в своих методах, вызываемых потоками.

Итак, после инициализации производителя перед запуском потоков я изначально планировал делать это в каждой функции потока:

        builder.payload(payload);
        producer.add_message(builder);
        producer.flush();

Затем я прочитал, что метод flush () блокирует и делает продюсера синхронным, и вместо этого вы должны использовать yield (),

        builder.payload(payload);
        producer.produce(builder);

, или просто метод add_message ()

        builder.payload(payload);
        producer.add_message(builder);

Эти данные затем будут отправлены в зависимости от моей настройки queue.buffering.max.ms.

Но что произойдет, когда буфер будет окончательно очищен, приведет ли это к блокированию любого из моих производителей или это произойдет автоматически в фоновом потоке? И если я установлю queue.buffering.max.ms = 0, это будет так же, как если бы я поместил flush () там, или это все еще асинхронный и неблокирующий? И если я захочу зарегистрировать данные в обратных вызовах неудачи / успеха производителя, вызовут ли эти обратные вызовы какой-либо поток производителя для блокировки?

...