ОБНОВЛЕНИЕ
Как упомянуто cricket_007, учитывая, что вы используете docker-compose, представленный ниже, вы должны использовать kafka:29092
для подключения к Kafka из другого контейнера.Итак, ваш код будет выглядеть так:
from kafka import KafkaConsumer, KafkaProducer
TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:29092']
consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)
END UPDATE
Я бы порекомендовал вам использовать образы Кафки из Confluent Inc , ониесть все виды примеров установок с использованием docker-compose, которые готовы к использованию, и они всегда обновляют их.
Попробуйте это:
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
flaskapp:
build: ./flask-app
container_name: flask_dev
ports:
- '9000:5000'
volumes:
- ./flask-app:/app
Я использовал это docker-compose.yml и добавил ваш сервис сверху Обратите внимание:
Используемая здесь конфигурация предоставляет порт 9092 для внешних подключений к брокеру, т. Е. Из вне сети докера.Это может быть от хоста, на котором запущен Docker, или, может быть, дальше, если у вас более сложная настройка.Если последнее имеет значение true, вам нужно изменить значение 'localhost' в KAFKA_ADVERTISED_LISTENERS на значение, которое разрешается на хост докера с этих удаленных клиентов
Убедитесь, что вы ознакомились с другими примерами, возможно,особенно полезно при переходе в производственную среду: https://github.com/confluentinc/cp-docker-images/tree/5.0.1-post/examples
Также стоит проверить:
Похоже, вам нужно указать api_version, чтобы избежать этой ошибки.Для более подробной информации проверьте здесь .
Версия 1.3.5 этой библиотеки (последняя версия для pypy) перечисляет только определенные версии API 0.8.0 до 0.10.1.Так что, если вы явно не укажете api_version как (0, 10, 1), попытка клиентской библиотеки обнаружить версию приведет к ошибке NoBrokersAvailable.
producer = KafkaProducer(
bootstrap_servers=URL,
client_id=CLIENT_ID,
value_serializer=JsonSerializer.serialize,
api_version=(0, 10, 1)
)
Это должно работать, что довольно интересно при установке api_versionслучайно решает проблему следующим образом:
Когда вы устанавливаете api_version, клиент не будет пытаться проверять брокеров на наличие информации о версии.Так что это операция зонда, которая терпит неудачу.Одно большое различие между соединениями зондов версии и общими соединениями состоит в том, что первый пытается подключиться только к одному интерфейсу для каждого соединения (для каждого брокера), тогда как последний - общая операция - будет непрерывно циклически проходить через все интерфейсы до тех пор, пока соединение не будет установлено.преуспевает.# 1411 исправляет это, переключая логику проверки версии, чтобы попытаться установить соединение на всех найденных интерфейсах.
Фактическая проблема описана здесь