рдкафка потребителя не прекращается на близких - PullRequest
0 голосов
/ 23 января 2019

Я написал код приложения, в котором есть продюсер и потребительская часть одного и того же процесса.Иногда у меня возникают проблемы с грациозным отключением потребителя кафки.Производитель Kafka всегда корректно завершает работу.

глобальная конфигурация kafka

conf->set("metadata.broker.list", brokers, errstr);
conf->set("bootstrap.servers", brokers, errstr);
char tmp[16];
snprintf(tmp, sizeof(tmp), "%i", SIGINT);
conf->set("internal.termination.signal", tmp, errstr);
conf->set("topic.metadata.refresh.interval.ms","3000", errstr);
conf->set("group.id", group_id, errstr);
conf->set("client.id", m_kafka_client_id, errstr);
conf->set("auto.offset.reset", "earliest", errstr);
conf->set("rebalance_cb", rebalance_cb, errstr);
conf->set("statistics.interval.ms", "3000", errstr);
conf->set("event_cb", event_cb, errstr);
conf->set("enable.auto.commit","false", errstr);

нет специальной конфигурации для тем

обратная трассировка основного процесса (где основной процесс застревает навсегда)

#0  0x00007f319e23ef97 in pthread_join () from /lib64/libpthread.so.0
#1  0x00000000022a7c12 in thrd_join (thr=thr@entry=139850883684096, res=res@entry=0x0) at tinycthread.c:749
#2  0x000000000224b1ee in rd_kafka_destroy_app (blocking=1, rk=0x946fb00) at rdkafka.c:737
#3  rd_kafka_destroy (rk=0x946fb00) at rdkafka.c:751
#4  0x000000000223b34e in RdKafka::KafkaConsumerImpl::close (this=0x94612c0) at KafkaConsumerImpl.cpp:251

и поток

#0  0x00007f3199326f0d in poll () from /lib64/libc.so.6
#1  0x000000000226dfed in rd_kafka_transport_poll (rktrans=rktrans@entry=0x898ae40, tmout=tmout@entry=1)
at rdkafka_transport.c:1538
#2  0x000000000226e06b in rd_kafka_transport_io_serve (rktrans=0x898ae40, timeout_ms=timeout_ms@entry=1)
at rdkafka_transport.c:1397
#3  0x000000000225d8d5 in rd_kafka_broker_serve (rkb=rkb@entry=0x9655500, abs_timeout=49646174735)
at rdkafka_broker.c:2293
#4  0x000000000225e558 in rd_kafka_broker_consumer_serve (rkb=0x9655500) at rdkafka_broker.c:3199
#5  rd_kafka_broker_thread_main (arg=arg@entry=0x9655500) at rdkafka_broker.c:3311
#6  0x00000000022a7797 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:624
#7  0x00007f319e23de25 in start_thread () from /lib64/libpthread.so.0
#8  0x00007f3199331bad in clone () from /lib64/libc.so.6

Я попытался изменить сигнал завершения с SIGINT, SIGIO на SIGKILL (что приводит к остановке всего процесса, поэтому не очень хорошо).

Я подозреваю, что следующее условие вызывает цикл и не завершает имя файла: rdkafka_broker.c, функция: rd_kafka_broker_thread_main

while (!rd_kafka_broker_terminating(rkb)) {    << this condition never becomes false
    ....
    if (rd_kafka_terminating(rkb->rkb_rk)) {   << this is correct see gdb
        ....
    }
}

(gdb) p rkb->rkb_refcnt
$11 = {lock = {__data = {__lock = 0, __count = 0, __owner = 0, __nusers = 0, __kind = 0, __spins = 0,
  __elision = 0, __list = {__prev = 0x0, __next = 0x0}}, __size = '\000' <repeats 39 times>, __align = 0}, v = 4}

(gdb) p rkb->rkb_rk->rk_terminate
$13 = {val = 1}
...