a Python клиент-потребитель работал нормально, когда работал как автономный, но не мог получить сообщение, когда работал как многопроцессорный рабочий с той же конфигурацией.
Клиенты всегда печатают сообщение в блоке, где он получает None для msg. Очень признателен за любую помощь в диагностике этой проблемы.
Рабочий в основном выглядит так:
from multiprocessing import Process
...
class saListener(Process):
def __init__(self, n)
self.ClientName = "saListener-" + str(n)
...
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
value_avro_deserializer = AvroDeserializer(ccloud_lib.value_schema, schema_registry_client)
conf["value.deserializer"] = value_avro_deserializer
self.cons = DeserializingConsumer(conf)
Process.__init__(self)
def connect(self):
self.cons.subscribe([self.topic])
def run(self):
while True:
msg = self.cons.poll(5.0)
if msg is None:
print(self.ClientName + ":Waiting for message or event/error in poll()")
Контроллер выглядит так:
for n in range(instances):
lnr_instance = saListener(n)
lnr_instance.connect()
lnr_instance.start()
Конфигурация клиента:
"bootstrap.servers" : "srv1:909,srv2:909",
"group.id" : "ainvil9_intraday_group",
"debug" : "all",
"max.poll.interval.ms" : "30000",
"enable.auto.commit" : "true",
"fetch.wait.max.ms" : "1000",
"session.timeout.ms" : "10000",
"auto.commit.interval.ms" : "500",
"sasl.mechanism" : "GSSAPI",
"security.protocol" : "SASL_PLAINTEXT",
"sasl.kerberos.service.name" : "kafka",
"ssl.ca.location" : "security/ca.cert.pem",
"sasl.kerberos.kinit.cmd" : "kinit -R -p -kt security/kafka_ist_producer.keytab kafka_ist_producer@DMS",
"sasl.kerberos.keytab" : "security/kafka_ist_producer.keytab",
"sasl.kerberos.principal" : "kafka_ist_producer@DMS"
Похоже, потребители, поскольку многопроцессорные рабочие не смогли получить смещение:
%7|1596766163.070|HEARTBEAT|rdkafka#consumer-2| [thrd:main]: GroupCoordinator/9: Heartbeat for group "ainvil9_intraday_group" generation id 1
%7|1596766163.070|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1596766163.070|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "ainvil9_intraday_group": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
%7|1596766163.070|SEND|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/9: Sent HeartbeatRequest (v3, 97 bytes @ 0, CorrId 12)
%7|1596766163.070|SEND|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/9: Sent HeartbeatRequest (v3, 97 bytes @ 0, CorrId 12)
%7|1596766163.073|RECV|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/9: Received HeartbeatResponse (v3, 6 bytes, CorrId 12, rtt 3.03ms)
%7|1596766163.073|RECV|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator/9: Received HeartbeatResponse (v3, 6 bytes, CorrId 12, rtt 3.08ms)
%7|1596766163.181|COMMIT|rdkafka#consumer-3| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1596766163.181|UNASSIGN|rdkafka#consumer-3| [thrd:main]: Group "ainvil9_intraday_group": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
%7|1596766163.573|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1596766163.573|COMMIT|rdkafka#consumer-2| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1596766163.573|COMMIT|rdkafka#consumer-4| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1596766163.573|UNASSIGN|rdkafka#consumer-4| [thrd:main]: Group "ainvil9_intraday_group": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
%7|1596766163.573|UNASSIGN|rdkafka#consumer-2| [thrd:main]: Group "ainvil9_intraday_group": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
%7|1596766163.573|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "ainvil9_intraday_group": unassign done in state up (join state wait-assign-rebalance_cb): without new assignment: OffsetCommit done (__NO_OFFSET)
%7|1596766163.682|COMMIT|rdkafka#consumer-3| [thrd:main]: OffsetCommit for -1 partition(s): cgrp auto commit timer: returned: Local: No offset stored