У меня есть следующий класс, который может создавать сообщения для Kafka:
using RdKafka::Conf;
class Producer : public RdKafka::DeliveryReportCb {
public:
Producer(const string& brokers): config_{Conf::create(Conf::CONF_GLOBAL)} {
string empty;
config_->set("bootstrap.servers", brokers_, empty);
config_->set("batch.size", "1000", empty);
config_->set("linger.ms", "5", empty)
config_->set("dr_cb", this, empty);
producer_ = RdKafka::Producer::create(config_.get(), empty);
}
private:
unique_ptr<Conf> config_;
RdKafka::Producer* producer_;
};
Затем я использую этот класс для создания 1000 сообщений:
Producer producer("10.104.126.247:9092"); // remote machine's running Kafka address.
for (int i = 0; i < 1000; i++) {
producer.Produce("item" + to_string(i));
}
где метод Produce
определяется как:
void Producer::Produce(const string& item) {
Conf* topic_config = Conf::create(Conf::CONF_TOPIC);
std::string error;
RdKafka::Topic* topic = RdKafka::Topic::create(producer_, "input", topic_config, error);
const string empty_key = "";
string* accepted = new string(item);
producer_->produce(
topic,
0,
RdKafka::Producer::RK_MSG_COPY | RdKafka::Producer::RK_MSG_BLOCK,
(void*)item.c_str(), item.length(),
empty_key.c_str(), 0,
(void*)accepted);
cout << "Produce request sent for " << item << endl;
}
У меня есть метод, который печатает сообщение всякий раз, когда получен обратный вызов:
void Producer::dr_cb(RdKafka::Message& message) override {
string* msg = static_cast<string*>(message.msg_opaque());
std::cout << "Callback received for " << *msg << endl;
delete msg;
}
, а также поток, который опрашивает обратные вызовы:
// Method invoked on a background thread
void Producer::Polling() {
while (true) {
producer_->poll(100);
}
}
Когда я запускаю программу, я вижу, что все производственные запросы выполняются мгновенно:
Produce request sent for item0
Produce request sent for item1
...
Produce request sent for item999
и только после этого я медленно получаю подтверждения обратного вызова:
Callback received for item0
Callback received for item1
... // Takes a long time
Callback received for item999
Вопрос: как ускорить продюсера, чтобы я быстрее получал подтверждения обратного вызова?