Экземпляр Consumer
от клиента confluent-kafka python всегда возвращает None при вызове poll()
с установленным временем ожидания.
topi c содержит некоторое сообщение, а официальный потребитель работает нормально:
$ vim ~/client.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=XXXXXXXXXX password="XXXXXXXXXX";
$ ~/kafka_2.13-2.4.0/bin/kafka-console-consumer.sh --topic my_topic --bootstrap-server somehost.:30742 --from-beginning --consumer.config ~/client.properties --group somenewgroup
msg1
msg2
msg3
Но Consumer.poll()
методы всегда вернуть нет. даже когда я изменяю пароль или хост на недопустимое значение, он также возвращает None.
Я установил регистратор для потребителя, но ничего не регистрируется.
код python выглядит следующим образом
consumer=Consumer({'sasl.mechanisms': "SCRAM-SHA-512",
'security.protocol': 'SASL_PLAINTEXT',
'sasl.username': 'XXXXXXXXXX',
'sasl.password': 'XXXXXXXXXX',
'bootstrap.servers': 'somehost.:30742',
"group.id":"somenewgroup",
'auto.offset.reset': 'beginning',
'logger':logger
},logger=logger)
consumer.subscribe(["my_topic"])
while True:
msg = consumer.poll(timeout=1.0)
print("poll success")
if msg is None:print("msg is None!")