multiprocessing Python Клиент-клиент Kafka не получает сообщения - PullRequest
0 голосов
/ 06 августа 2020

a Python клиент-потребитель работал нормально, когда работал как автономный, но не мог получить сообщение, когда работал как многопроцессорный рабочий с той же конфигурацией.

Клиенты всегда печатают сообщение в блоке, где он получает None для msg. Очень признателен за любую помощь в диагностике этой проблемы.

Рабочий в основном выглядит так:

from multiprocessing import Process
...
class saListener(Process):
     def   __init__(self, n)
            self.ClientName = "saListener-" + str(n)
            ...
            schema_registry_client = SchemaRegistryClient(schema_registry_conf)
            value_avro_deserializer = AvroDeserializer(ccloud_lib.value_schema, schema_registry_client)
            conf["value.deserializer"] = value_avro_deserializer
            self.cons = DeserializingConsumer(conf)
            Process.__init__(self)

    def connect(self):
            self.cons.subscribe([self.topic])

    def run(self):
            while True:
                            msg = self.cons.poll(5.0)
                            if msg is None:
                                    print(self.ClientName + ":Waiting for message or event/error in poll()")

Контроллер выглядит так:

for n in range(instances):
                lnr_instance = saListener(n)
                lnr_instance.connect()
                lnr_instance.start()

Конфигурация клиента:

   "bootstrap.servers" : "srv1:909,srv2:909",
   "group.id" : "ainvil9_intraday_group",
   "debug" : "all",
   "max.poll.interval.ms" : "30000",
   "enable.auto.commit" : "true",
   "fetch.wait.max.ms" : "1000",
   "session.timeout.ms" : "10000",
   "auto.commit.interval.ms" : "500",
   "sasl.mechanism" : "GSSAPI",
   "security.protocol" : "SASL_PLAINTEXT",
   "sasl.kerberos.service.name" : "kafka",
   "ssl.ca.location" : "security/ca.cert.pem",
   "sasl.kerberos.kinit.cmd" : "kinit -R -p -kt security/kafka_ist_producer.keytab kafka_ist_producer@DMS",
   "sasl.kerberos.keytab" : "security/kafka_ist_producer.keytab",
   "sasl.kerberos.principal" : "kafka_ist_producer@DMS"

Похоже, потребители, поскольку многопроцессорные рабочие не смогли получить смещение:

%7|1596766163.070|HEARTBEAT|rdkafka#consumer-2| [thrd:main]: GroupCoordinator/9: Heartbeat for group "ainvil9_intraday_group" generation id 1
 %7|1596766163.070|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
 %7|1596766163.070|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "ainvil9_intraday_group": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
 %7|1596766163.070|SEND|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/9: Sent HeartbeatRequest (v3, 97 bytes @ 0, CorrId 12)
 %7|1596766163.070|SEND|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/9: Sent HeartbeatRequest (v3, 97 bytes @ 0, CorrId 12)
 %7|1596766163.073|RECV|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/9: Received HeartbeatResponse (v3, 6 bytes, CorrId 12, rtt 3.03ms)
 %7|1596766163.073|RECV|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/9: Received HeartbeatResponse (v3, 6 bytes, CorrId 12, rtt 3.08ms)
 %7|1596766163.181|COMMIT|rdkafka#consumer-3| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
 %7|1596766163.181|UNASSIGN|rdkafka#consumer-3| [thrd:main]: Group "ainvil9_intraday_group": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
 %7|1596766163.573|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
 %7|1596766163.573|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
 %7|1596766163.573|COMMIT|rdkafka#consumer-4| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
 %7|1596766163.573|UNASSIGN|rdkafka#consumer-4| [thrd:main]: Group "ainvil9_intraday_group": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
 %7|1596766163.573|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "ainvil9_intraday_group": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
 %7|1596766163.573|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "ainvil9_intraday_group": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
 %7|1596766163.682|COMMIT|rdkafka#consumer-3| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...