Я застрял с проблемой, связанной с потребителем Kafka, использующим библиотеку python confluent-kafka.
CONTEXT
У меня есть тема Kafka об AWS EC2, которую я должен потреблять.
SCENARIO
Consumer Script (my_topic_consumer.py) использует confluent-kafka-python для создания потребителя (показано ниже) и подписки на тему «my_topic». Проблема заключается в том, что потребитель не может читать сообщения из кластера Kafka.
Все необходимые шаги безопасности выполнены:
1. SSL - протокол безопасности для потребителя и брокера.
2. В группу безопасности кластера добавлено IP-блок потребителя EC2.
#my_topic_consumer.py
from confluent_kafka import Consumer, KafkaError
c = Consumer({
'bootstrap.servers': 'my_host:my_port',
'group.id': 'my_group',
'auto.offset.reset': 'latest',
'security.protocol': 'SSL',
'ssl.ca.location': '/path/to/certificate.pem'
})
c.subscribe(['my_topic'])
try:
while True:
msg = c.poll(5)
if msg is None:
print('None')
continue
if msg.error():
print(msg)
continue
else:
#process_msg(msg) - Writes messages to a data file.
except KeyboardInterrupt:
print('Aborted by user\n')
finally:
c.close()
URLS
Хост-брокер: my_host
Порт: my_port
Идентификатор группы: my_group
КОМАНДЫ КОНСОЛИ
работает - при запуске сценария консоли-получателя я могу видеть данные:
kafka-console-consumer --bootstrap-server my_host:my_port --consumer.config client-ssl.properties --skip-message-on-error --topic my_topic | jq
Примечание: client-ssl.properties: указывает на файл JKS, в котором есть сертификаты.
Дальнейшая отладка в кластере Kafka (отдельный экземпляр EC2 от потребителя), я не смог увидеть регистрацию моегоПотребитель по моему group_id (my_group):
kafka-consumer-groups --botstrap-server my_host:my_port --command-config client-ssl.properties --descrive --group my_group
Это наводит меня на мысль, что потребитель не регистрируется в кластере, поэтому может быть сбой рукопожатия SSL? Как проверить это со стороны потребителя в python?
Примечание
- кластер находится за прокси-сервером (корпоративным), но я запускаю прокси-сервер на потребительском EC2 перед тестированием.
- запустил процесс через pm2, но не увидел никаких журналов ошибок, таких как тайм-ауты req и т. Д.
Можно ли как-то проверить, что создание Consumer не удалось определенным образом, и выяснить,первопричина? Любая помощь и отзывы приветствуются.