Кафка Python Потребитель - Конфлюент Кафка - PullRequest
0 голосов
/ 02 мая 2020

Я написал Python потребителя, использующего пакет confluent-kafka. После нескольких часов работы потребитель умирает со следующей ошибкой

cimpl.KafkaException:
KafkaError{code=_TIMED_OUT,val=-185,str="FindCoordinator response
error: Local: Timed out"}

Может кто-нибудь помочь мне понять, почему это происходит

**

  • Ниже приведена часть кода

**

 producer_conf = {
'bootstrap.servers': 'xxxxxxx',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': 'xxxxx',
'sasl.password': 'xxxx',
'ssl.ca.location':'xxxx',
'ssl.certificate.location': 'xxxx',
'queue.buffering.max.messages': 100000,
'queue.buffering.max.ms' : 1000,
'batch.num.messages': 500
}

p = Producer(**producer_conf)
target_topic='xxxxxx'

c = Consumer(kwargs)
source_topic='xxxx'
c.subscribe([source_topic])
while True:

    msg = c.poll(100) #I am consuming from a topic

    if msg is None:
        continue
    if msg.error():
        logging.error("error occurred during polling topic")
        logging.error(msg.error())
        raise KafkaException(msg.error())
        continue

    #logging.info("input msg form topic: ")
    #logging.info(msg.value())
    #msgDict = json.loads(msg.value())  # taking msg into dictionary
    try:
        p.produce(target_topic, msg.value(), callback=delivery_callback) #the message from the consumed topic is pushed to the target topic
        c.commit() #disabled auto commit, manually committing only when message pushed to the target topic
    except BufferError:
        sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
                         len(p))
    except Exception as e:
        print(e)

    p.poll(0)
    #sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()
...