Я пытаюсь установить безопасную связь между сервером Apache Kafka и клиентом. Я создал клиентские и серверные хранилища и хранилище ключей и выполнил соответствующие настройки, перейдя по этой ссылке - https://www.confluent.io/blog/apache-kafka-security-authorization-authentication-encryption/
Это мой следующий код:
from kafka import KafkaProducer, KafkaConsumer
producer = KafkaProducer(bootstrap_servers='cylc-vm:9092',
security_protocol='SSL',
ssl_cafile='CARoot.pem',
ssl_certfile='certificate.pem',
ssl_keyfile='key.pem')
producer.send('foobar1', b'some_message_bytes')
producer.flush()
consumer = KafkaConsumer('foobar1', auto_offset_reset='earliest', bootstrap_servers='cylc-vm:9092')
for each in consumer:
print(each)
В приведенном выше коде, когда потребитель пытается подключиться, он выдает следующее исключение:
Traceback (most recent call last):
File "test.py", line 14, in <module>
consumer = KafkaConsumer('foobar1', auto_offset_reset='earliest', bootstrap_servers='cylc-vm:9092')
File "/usr/lib/python3.6/site-packages/kafka/consumer/group.py", line 324, in __init__
self._client = KafkaClient(metrics=self._metrics, **self.config)
File "/usr/lib/python3.6/site-packages/kafka/client_async.py", line 221, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/usr/lib/python3.6/site-packages/kafka/client_async.py", line 826, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
Целью удаления аргументов ssl из класса Consumer является проверка правильности работы программы установки SSL. Я ожидал какого-то исключения SSL Authenticate, однако выброшенное исключение - это нечто другое. Я что-то здесь упустил или это ожидаемое поведение? И это показывает, что SSL работает здесь нормально?
Спасибо.