Как ускорить Kafka Producer, чтобы быстрее получать подтверждения обратного вызова для производственных вызовов? - PullRequest
0 голосов
/ 17 июня 2020

У меня есть следующий класс, который может создавать сообщения для Kafka:

using RdKafka::Conf;

class Producer : public RdKafka::DeliveryReportCb {
public:
    Producer(const string& brokers): config_{Conf::create(Conf::CONF_GLOBAL)} {
        string empty;
        config_->set("bootstrap.servers", brokers_, empty);
        config_->set("batch.size", "1000", empty);
        config_->set("linger.ms", "5", empty)
        config_->set("dr_cb", this, empty);
        producer_ = RdKafka::Producer::create(config_.get(), empty);
    }
private:
    unique_ptr<Conf> config_;
    RdKafka::Producer* producer_;
};

Затем я использую этот класс для создания 1000 сообщений:

Producer producer("10.104.126.247:9092"); // remote machine's running Kafka address.
for (int i = 0; i < 1000; i++) {
    producer.Produce("item" + to_string(i));
}

где метод Produce определяется как:

void Producer::Produce(const string& item) {       
    Conf* topic_config = Conf::create(Conf::CONF_TOPIC);
    std::string error;
    RdKafka::Topic* topic = RdKafka::Topic::create(producer_, "input", topic_config, error);
    const string empty_key = "";
    string* accepted = new string(item);
    producer_->produce(
        topic, 
        0,
        RdKafka::Producer::RK_MSG_COPY | RdKafka::Producer::RK_MSG_BLOCK,
        (void*)item.c_str(), item.length(),
        empty_key.c_str(), 0,
        (void*)accepted);
    cout << "Produce request sent for " << item << endl;
}

У меня есть метод, который печатает сообщение всякий раз, когда получен обратный вызов:

void Producer::dr_cb(RdKafka::Message& message) override {
    string* msg = static_cast<string*>(message.msg_opaque());
    std::cout << "Callback received for " << *msg << endl;
    delete msg;
}

, а также поток, который опрашивает обратные вызовы:

// Method invoked on a background thread
void Producer::Polling() {
    while (true) {
        producer_->poll(100);
    }
}

Когда я запускаю программу, я вижу, что все производственные запросы выполняются мгновенно:

Produce request sent for item0
Produce request sent for item1
...
Produce request sent for item999

и только после этого я медленно получаю подтверждения обратного вызова:

Callback received for item0
Callback received for item1
... // Takes a long time
Callback received for item999

Вопрос: как ускорить продюсера, чтобы я быстрее получал подтверждения обратного вызова?

...