Я пытаюсь настроить docker-compose с помощью Kafka @ Wurstmeister.
СЦЕНАРИЙ: Я разрабатываю архитектуру нескольких микросервисов.В частности: у меня есть приложение весенней загрузки, которое отправляет JSON моему брокеру kafka.Служба Flask потребляет данные.Это работает, когда работает весь мыслительный докер.Я также могу отправлять данные в тему kafka в Docker.
КОД: Колба:
KafkaHost = "kafka:9092"
def initkafka():
# connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer("TEST",
group_id='view',
bootstrap_servers=[Constants.KafkaHost]
)
KafkaConsumer(auto_offset_reset='latest',
enable_auto_commit=False)
KafkaConsumer(value_deserializer=lambda m: json.loads(m.dedoce('utf-8')))
KafkaConsumer(consumer_timeout_ms=1000)
return consumer
Docker Compose:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
networks:
- test-net
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
#KAFKA_ADVERTISED_HOST_NAME: 172.17.0.1
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.17.0.1:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "TEST:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
networks:
- test-net
ОШИБКА
Traceback (most recent call last):
File "run.py", line 1, in <module>
from controller import Controller
File "/app/controller/Controller.py", line 27, in <module>
consumer = KafkaConfig.initkafka()
File "/app/config/KafkaConfig.py", line 16, in initkafka
enable_auto_commit=False)
File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 324, in __init__
self._client = KafkaClient(metrics=self._metrics, **self.config)
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 221, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 826, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
Я думаю, что это проблема конфигурации среды.Я прочитал документ wurstmeister, но не могу понять, что мне нужно настроить, чтобы мой сервис фляг нашел брокера kafka.В журналах говорится, что Кафка запущена и тема «ТЕСТ» создана.Нужно ли настраивать слушателей, например, с помощью ip, и порт в моей сети будет слушать kafka?Потому что в kafka docs advertised.listeners описывается как
Прослушиватели для публикации в ZooKeeper для использования клиентами, если они отличаются от свойства конфигурации прослушивателей.В средах IaaS это может отличаться от интерфейса, с которым связывается брокер.Если это не установлено, будет использоваться значение для слушателей.В отличие от слушателей недопустимо рекламировать метаадрес 0.0.0.0.