потребитель confluent-kafka-python не может читать сообщения - PullRequest
0 голосов
/ 07 октября 2019

Я застрял с проблемой, связанной с потребителем Kafka, использующим библиотеку python confluent-kafka.

CONTEXT
У меня есть тема Kafka об AWS EC2, которую я должен потреблять.

SCENARIO
Consumer Script (my_topic_consumer.py) использует confluent-kafka-python для создания потребителя (показано ниже) и подписки на тему «my_topic». Проблема заключается в том, что потребитель не может читать сообщения из кластера Kafka.

Все необходимые шаги безопасности выполнены:
1. SSL - протокол безопасности для потребителя и брокера.
2. В группу безопасности кластера добавлено IP-блок потребителя EC2.

#my_topic_consumer.py
from confluent_kafka import Consumer, KafkaError

c = Consumer({
  'bootstrap.servers': 'my_host:my_port',
  'group.id': 'my_group',
  'auto.offset.reset': 'latest',
  'security.protocol': 'SSL',
  'ssl.ca.location': '/path/to/certificate.pem'
})

c.subscribe(['my_topic'])

try:
  while True:
    msg = c.poll(5)
      if msg is None:
        print('None')
        continue
      if msg.error():
        print(msg)
        continue
      else:
        #process_msg(msg) - Writes messages to a data file.

except KeyboardInterrupt:
  print('Aborted by user\n')

finally:
  c.close()

URLS
Хост-брокер: my_host
Порт: my_port
Идентификатор группы: my_group

КОМАНДЫ КОНСОЛИ
работает - при запуске сценария консоли-получателя я могу видеть данные:

kafka-console-consumer --bootstrap-server my_host:my_port --consumer.config client-ssl.properties --skip-message-on-error --topic my_topic | jq

Примечание: client-ssl.properties: указывает на файл JKS, в котором есть сертификаты.

Дальнейшая отладка в кластере Kafka (отдельный экземпляр EC2 от потребителя), я не смог увидеть регистрацию моегоПотребитель по моему group_id (my_group):

kafka-consumer-groups --botstrap-server my_host:my_port --command-config client-ssl.properties --descrive --group my_group

Это наводит меня на мысль, что потребитель не регистрируется в кластере, поэтому может быть сбой рукопожатия SSL? Как проверить это со стороны потребителя в python?

Примечание
- кластер находится за прокси-сервером (корпоративным), но я запускаю прокси-сервер на потребительском EC2 перед тестированием.
- запустил процесс через pm2, но не увидел никаких журналов ошибок, таких как тайм-ауты req и т. Д.

Можно ли как-то проверить, что создание Consumer не удалось определенным образом, и выяснить,первопричина? Любая помощь и отзывы приветствуются.

...