Я реализую клиент kafka как для производителя, так и для потребителя. Я создаю несколько текстовых сообщений (и вижу их в правом topi c при запуске файла примера 'kafka-console-consumer. sh'). Однако, когда я пытаюсь использовать свой собственный код, я получаю тайм-аут. Потребитель и топи c оба действительны. Любой совет? Использование librdkafka с Qt 5.10 в Ubuntu 16.04, если это имеет значение.
Код класса:
kafkaClient::kafkaClient(QObject *parent) : QObject(parent) {
std::string brokers = "127.0.0.1:9092";
std::string errstr;
std::string mode;
std::string debug;
/*
* Create configuration objects
*/
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("enable.partition.eof", "true", errstr);
conf->set("metadata.broker.list", brokers, errstr);
producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
exit(1);
}
std::cout << "% Created producer " << producer->name() << std::endl;
consumer = RdKafka::Consumer::create(conf, errstr);
std::cout << "% Created consumer " << consumer->name() << std::endl;
}
void kafkaClient::produce(QString message, QString topicStr) {
int32_t partition = RdKafka::Topic::PARTITION_UA;
auto topic = std::find_if(topics.begin(), topics.end(), [topicStr](RdKafka::Topic * top){
return top->name().compare(topicStr.toStdString()) == 0;
});
if (topic != topics.end()) {
RdKafka::ErrorCode resp = producer->produce(*topic, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
(message.toUtf8().data()), message.size(), nullptr, nullptr);
} else {
std::cout << "Topic not found" << std::endl;
}
}
void kafkaClient::consume() {
consumer->poll(0);
int32_t partition = RdKafka::Topic::PARTITION_UA;
RdKafka::Message* message = consumer->consume(topics.last(), partition, 1000);
const RdKafka::Headers *headers;
switch (message->err()) {
case RdKafka::ERR__TIMED_OUT:
std::cout << "Timeout" << std::endl;
break;
case RdKafka::ERR_NO_ERROR:
/* Real message */
std::cout << "Read msg at offset " << message->offset() << std::endl;
if (message->key()) {
std::cout << "Key: " << *message->key() << std::endl;
}
headers = message->headers();
if (headers) {
std::vector<RdKafka::Headers::Header> hdrs = headers->get_all();
for (size_t i = 0 ; i < hdrs.size() ; i++) {
const RdKafka::Headers::Header hdr = hdrs[i];
if (hdr.value() != NULL)
printf(" Header: %s = \"%.*s\"\n",
hdr.key().c_str(),
(int)hdr.value_size(), (const char *)hdr.value());
else
printf(" Header: %s = NULL\n", hdr.key().c_str());
}
}
printf("%.*s\n",
static_cast<int>(message->len()),
static_cast<const char *>(message->payload()));
break;
case RdKafka::ERR__PARTITION_EOF:
std::cerr << "Error" << message->errstr() << std::endl;
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
std::cerr << "Consume failed: " << message->errstr() << std::endl;
break;
default:
/* Errors */
std::cerr << "Consume failed: " << message->errstr() << std::endl;
}
std::cout << "here" << std::endl;
}
void kafkaClient::createTopic(QString topicStr) {
int32_t partition = RdKafka::Topic::PARTITION_UA;
std::string errstr;
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
RdKafka::Topic *topic = RdKafka::Topic::create(producer, topicStr.toStdString(), tconf, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
}
topics.append(topic);
RdKafka::ErrorCode resp = consumer->start(topics.last(), 0, 0);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to start consumer: " <<
RdKafka::err2str(resp) << std::endl;
exit(1);
}
}
Основной код
kafkaClient c(nullptr);
c.createTopic("test");
c.produce("hello kafka from qt" + QString::number(QTime::currentTime().second()), "test");
QThread::sleep(1);
c.consume();