У меня установлено успешное соединение между производителем и потребителем Kafka в кластере облачной платформы Google, установленное:
$ cd /usr/lib/kafka
$ bin/kafka-console-producer.sh config/server.properties --broker-list \
PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092 --topic test
и выполнение в новой оболочке
$ cd /usr/lib/kafka
$ bin/kafka-console-consumer.sh --bootstrap-server \
PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092 --topic test \
--from-beginning
Теперь я хочу отправить сообщения на сервер производителя Kafka, используя следующий скрипт на python:
from kafka import *
topic = 'test'
producer = KafkaProducer(bootstrap_servers='PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092',
api_version=(0,10))
producer.send(topic, b"Test test test")
Однако, это приводит к KafkaTimeoutError
:
"Failed to update metadata after %.1f secs." % (max_wait,))
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
Оглядываясь в Интернете, сказал мне подумать:
- раскомментирование
listeners=...
и advertised.listeners=...
в файле /usr/lib/kafka/config/server.properties
.
Однако listeners=PLAINTEXT://:9092
не работает и этот пост предлагает установить PLAINTEXT://<external-ip>:9092
.
Итак, я начал интересоваться доступом к серверу Kafka через внешний (статический) IP-адрес кластера GCP. Затем мы установили правило брандмауэра для доступа к порту (?) И разрешаем https доступ к кластеру. Но я не уверен, является ли это излишним решением проблемы.
Мне определенно нужны рекомендации для успешного подключения к серверу Kafka из скрипта python.