Я написал C ++ потребитель для сообщений kafka, используя библиотеку cppkafka (обертка для librdkafka).У меня проблема при запуске программы.Иногда возникает проблема с ошибкой сегментации (дамп ядра) перед назначенной темой, но иногда она запускается без каких-либо ошибок.Ниже вы можете найти мой основной () код.
bool running = true;
int main() {
string topic = "topic";
string stop = "STOP";
int m_count = 0;
signal(SIGINT, [](int) { running = false; });
Configuration config = {
{ "metadata.broker.list", "kafka-1:19092"},
{"group.id", topic},
{"fetch.wait.max.ms", 10},
{"enable.auto.commit", false},
{"auto.offset.reset", "latest"}
};
Consumer consumer(config);
consumer.set_assignment_callback([](const TopicPartitionList& partitions) {
cout << "Got assigned: " << partitions << endl;
});
consumer.set_revocation_callback([](const TopicPartitionList& partitions) {
cout << "Got revoked: " << partitions << endl;
});
consumer.subscribe({ topic });
cout << "Consuming messages from topic " << topic << endl;
auto ms = std::chrono::microseconds(10);
while (running) {
Message msg = consumer.poll();
if (msg) {
if (msg.get_error()) {
if (!msg.is_eof()) {
cout << "[+] Received error notification: " << msg.get_error() << endl;
}
}
else {
m_count ++;
cout << msg.get_payload()<<" OFSET: "<< msg.get_offset() <<" TIME: "<<getMs() - msg.get_timestamp().get().get_timestamp().count()<< endl;
consumer.commit(msg);
}
}
}
}