Потребитель Kafka получает тайм-аут, но сообщение создается - PullRequest
0 голосов
/ 06 января 2020

Я реализую клиент kafka как для производителя, так и для потребителя. Я создаю несколько текстовых сообщений (и вижу их в правом topi c при запуске файла примера 'kafka-console-consumer. sh'). Однако, когда я пытаюсь использовать свой собственный код, я получаю тайм-аут. Потребитель и топи c оба действительны. Любой совет? Использование librdkafka с Qt 5.10 в Ubuntu 16.04, если это имеет значение.

Код класса:

kafkaClient::kafkaClient(QObject *parent) : QObject(parent) {
    std::string brokers = "127.0.0.1:9092";
    std::string errstr;
    std::string mode;
    std::string debug;
    /*
    * Create configuration objects
    */
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("enable.partition.eof", "true", errstr);
    conf->set("metadata.broker.list", brokers, errstr);
    producer = RdKafka::Producer::create(conf, errstr);
    if (!producer) {
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        exit(1);
    }
    std::cout << "% Created producer " << producer->name() << std::endl;
    consumer = RdKafka::Consumer::create(conf, errstr);
    std::cout << "% Created consumer " << consumer->name() << std::endl;

}

void kafkaClient::produce(QString message, QString topicStr) {
    int32_t partition = RdKafka::Topic::PARTITION_UA;
    auto topic = std::find_if(topics.begin(), topics.end(), [topicStr](RdKafka::Topic * top){
        return top->name().compare(topicStr.toStdString()) == 0;
    });
    if (topic != topics.end()) {
        RdKafka::ErrorCode resp = producer->produce(*topic, partition,
            RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
            (message.toUtf8().data()), message.size(), nullptr, nullptr);
    } else {
        std::cout << "Topic not found" << std::endl;
    }
}

void kafkaClient::consume() {
    consumer->poll(0);
    int32_t partition = RdKafka::Topic::PARTITION_UA;
    RdKafka::Message* message = consumer->consume(topics.last(), partition, 1000);
    const RdKafka::Headers *headers;
      switch (message->err()) {
        case RdKafka::ERR__TIMED_OUT:
          std::cout << "Timeout" << std::endl;
          break;

        case RdKafka::ERR_NO_ERROR:
          /* Real message */
          std::cout << "Read msg at offset " << message->offset() << std::endl;
          if (message->key()) {
            std::cout << "Key: " << *message->key() << std::endl;
          }
          headers = message->headers();
          if (headers) {
            std::vector<RdKafka::Headers::Header> hdrs = headers->get_all();
            for (size_t i = 0 ; i < hdrs.size() ; i++) {
              const RdKafka::Headers::Header hdr = hdrs[i];

              if (hdr.value() != NULL)
                printf(" Header: %s = \"%.*s\"\n",
                       hdr.key().c_str(),
                       (int)hdr.value_size(), (const char *)hdr.value());
              else
                printf(" Header:  %s = NULL\n", hdr.key().c_str());
            }
          }
          printf("%.*s\n",
            static_cast<int>(message->len()),
            static_cast<const char *>(message->payload()));
          break;

        case RdKafka::ERR__PARTITION_EOF:
            std::cerr << "Error" << message->errstr() << std::endl;
          break;

        case RdKafka::ERR__UNKNOWN_TOPIC:
        case RdKafka::ERR__UNKNOWN_PARTITION:
          std::cerr << "Consume failed: " << message->errstr() << std::endl;
          break;

        default:
          /* Errors */
          std::cerr << "Consume failed: " << message->errstr() << std::endl;
      }

    std::cout << "here" << std::endl;
}

void kafkaClient::createTopic(QString topicStr) {

    int32_t partition = RdKafka::Topic::PARTITION_UA;

    std::string errstr;
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    RdKafka::Topic *topic = RdKafka::Topic::create(producer, topicStr.toStdString(), tconf, errstr);
    if (!topic) {
        std::cerr << "Failed to create topic: " << errstr << std::endl;
    }
    topics.append(topic);

    RdKafka::ErrorCode resp = consumer->start(topics.last(), 0, 0);
    if (resp != RdKafka::ERR_NO_ERROR) {
      std::cerr << "Failed to start consumer: " <<
    RdKafka::err2str(resp) << std::endl;
      exit(1);
    }
}

Основной код

kafkaClient c(nullptr);
c.createTopic("test");
c.produce("hello kafka from qt" + QString::number(QTime::currentTime().second()), "test");
QThread::sleep(1);
c.consume();
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...