В моем приложении я создаю одного производителя и одного потребителя.
conf->set("dr_cb", delivery_cb, errstr);
conf->set("event_cb", event_cb, errstr);
RdKafka::Producer::create(conf.get(), errstr)
conf->set("log_level", "0", errstr);
conf->set("group.id", group_id, errstr);
conf->set("client.id", m_kafka_client_id, errstr);
conf->set("auto.offset.reset", "earliest", errstr);
conf->set("rebalance_cb", rebalance_cb, errstr);
conf->set("statistics.interval.ms", "3000", errstr);
conf->set("event_cb", event_cb, errstr);
RdKafka::KafkaConsumer::create(conf.get(), errstr)
тогда я пытаюсь получить метаданные следующим образом
err = _consumer->metadata(false, nullptr, &metadata, METADATA_TIMEOUT);
std::unique_ptr<RdKafka::Metadata> metadata_uptr(metadata); // Handover the raw pointer to the unique_ptr now
если по какой-то причине связь с брокером не работает, я получаю сообщение об ошибке, тогда я получаю, что производитель и потребитель удаляются с помощью unique_ptr -> destructor.
и затем это зацикливается до тех пор, пока приложение успешно не подключится к брокерам.
Что я заметил, так это то, что я вижу, как создается множество потоков, и остаюсь там. в какой-то момент счет достиг 2000 потоков.
Как правильно очистить Кафку?
темы застряли здесь
#0 0x00007f6acc031cf2 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1 0x00000000004b8e65 in cnd_timedwait_ms (cnd=cnd@entry=0xab41b8, mtx=mtx@entry=0xab4190, timeout_ms=<optimized out>) at tinycthread.c:501
#2 0x00000000004853aa in rd_kafka_q_serve (rkq=0xab4190, timeout_ms=<optimized out>, max_cnt=max_cnt@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_CALLBACK,
callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:440
#3 0x000000000045a43c in rd_kafka_thread_main (arg=arg@entry=0xabf760) at rdkafka.c:1227
#4 0x00000000004b8c07 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:624
#5 0x00007f6acc02de25 in start_thread () from /lib64/libpthread.so.0
#6 0x00007f6acaa7834d in clone () from /lib64/libc.so.6
Я также попытался вызвать следующую процедуру при ошибке, но без помощи ...
RdKafka::wait_destroyed(5000);