Доступ к серверу производителя Kafka через скрипт Python на GCP - PullRequest
0 голосов
/ 13 марта 2019

У меня установлено успешное соединение между производителем и потребителем Kafka в кластере облачной платформы Google, установленное:

$ cd /usr/lib/kafka
$ bin/kafka-console-producer.sh config/server.properties --broker-list \
PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092  --topic test

и выполнение в новой оболочке

$ cd /usr/lib/kafka
$ bin/kafka-console-consumer.sh --bootstrap-server \
PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092 --topic test \
 --from-beginning

Теперь я хочу отправить сообщения на сервер производителя Kafka, используя следующий скрипт на python:

from kafka import *

topic = 'test'
producer = KafkaProducer(bootstrap_servers='PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092', 
api_version=(0,10))

producer.send(topic, b"Test test test")

Однако, это приводит к KafkaTimeoutError:

"Failed to update metadata after %.1f secs." % (max_wait,))
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

Оглядываясь в Интернете, сказал мне подумать:

  • раскомментирование listeners=... и advertised.listeners=... в файле /usr/lib/kafka/config/server.properties.

Однако listeners=PLAINTEXT://:9092 не работает и этот пост предлагает установить PLAINTEXT://<external-ip>:9092.

Итак, я начал интересоваться доступом к серверу Kafka через внешний (статический) IP-адрес кластера GCP. Затем мы установили правило брандмауэра для доступа к порту (?) И разрешаем https доступ к кластеру. Но я не уверен, является ли это излишним решением проблемы.

Мне определенно нужны рекомендации для успешного подключения к серверу Kafka из скрипта python.

Ответы [ 2 ]

1 голос
/ 14 марта 2019

Вам необходимо установить advertised.listeners на адрес, к которому подключается ваш клиент.

Подробнее: https://rmoff.net/2018/08/02/kafka-listeners-explained/

0 голосов
/ 16 марта 2019

Спасибо, Робин!Ссылка, которую вы разместили, была очень полезна для поиска следующих рабочих конфигураций.

Несмотря на то, что SimpleProducer кажется устаревшим подходом, следующие настройки наконец-то сработали для меня:

Python-скрипт:

from kafka import *
topic = 'test'
kafka = KafkaClient('[project-name]-w-0.c.[cluster-id].internal:9092')
producer = SimpleProducer(kafka)

message = "Test"
producer.send_messages(topic, message.encode('utf-8'))

и раскомментируйте в файле /usr/lib/kafka/config/server.properties:

listeners=PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092
advertised.listeners=PLAINTEXT://[project-name]-w-0.c.[cluster-id].internal:9092
...