Невозможно подключиться к Kafka из Flask в докернизированной среде - PullRequest
0 голосов
/ 27 ноября 2018

Я пытаюсь создать приложение Flask, в котором в качестве интерфейса используется Kafka.Я использовал Python-коннектор, kafka-python и образ Docker для Kafka, spotify / kafkaproxy .

Ниже приведен файл docker-compose.

version: '3.3'
services:
  kafka:
    image: spotify/kafkaproxy
    container_name: kafka_dev
    ports:
      - '9092:9092'
      - '2181:2181'
    environment:
      - ADVERTISED_HOST=0.0.0.0
      - ADVERTISED_PORT=9092
      - CONSUMER_THREADS=1
      - TOPICS=PROFILE_CREATED,IMG_RATED
      - ZK_CONNECT=kafka7zookeeper:2181/root/path
  flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
      - '9000:5000'
    volumes:
      - ./flask-app:/app
    depends_on:
      - kafka

Ниже приведен фрагмент Python, который я использовал для подключения к kafka.Здесь я использовал псевдоним контейнера Kafka kafka для подключения, поскольку Docker позаботился о сопоставлении псевдонима с его IP-адресом.

from kafka import KafkaConsumer, KafkaProducer

TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:9092']

consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)

Я получил ошибку NoBrokersAvailable.Из этого я понял, что приложение Flask не может найти сервер Kafka.

Traceback (most recent call last):
  File "./app.py", line 11, in <module>
    consumer = KafkaConsumer("PROFILE_CREATED", bootstrap_servers=BOOTSTRAP_SERVERS)
  File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 340, in __init__
    self._client = KafkaClient(metrics=self._metrics, **self.config)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 219, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 819, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

Другие наблюдения:

  1. Мне удалось запустить ping kafka из контейнера Flask и получите пакеты из контейнера Kafka.
  2. Когда я запускаю приложение Flask локально, пытаясь подключиться к контейнеру Kafka, установив BOOTSTRAP_SERVERS = ['localhost:9092'], оно работает нормально.

1 Ответ

0 голосов
/ 27 ноября 2018

ОБНОВЛЕНИЕ

Как упомянуто cricket_007, учитывая, что вы используете docker-compose, представленный ниже, вы должны использовать kafka:29092 для подключения к Kafka из другого контейнера.Итак, ваш код будет выглядеть так:

from kafka import KafkaConsumer, KafkaProducer

TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:29092']

consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)

END UPDATE

Я бы порекомендовал вам использовать образы Кафки из Confluent Inc , ониесть все виды примеров установок с использованием docker-compose, которые готовы к использованию, и они всегда обновляют их.

Попробуйте это:

---
version: '2'
services:
zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000

kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
    - zookeeper
    ports:
    - 9092:9092
    environment:
    KAFKA_BROKER_ID: 1
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
    - '9000:5000'
    volumes:
    - ./flask-app:/app

Я использовал это docker-compose.yml и добавил ваш сервис сверху Обратите внимание:

Используемая здесь конфигурация предоставляет порт 9092 для внешних подключений к брокеру, т. Е. Из вне сети докера.Это может быть от хоста, на котором запущен Docker, или, может быть, дальше, если у вас более сложная настройка.Если последнее имеет значение true, вам нужно изменить значение 'localhost' в KAFKA_ADVERTISED_LISTENERS на значение, которое разрешается на хост докера с этих удаленных клиентов

Убедитесь, что вы ознакомились с другими примерами, возможно,особенно полезно при переходе в производственную среду: https://github.com/confluentinc/cp-docker-images/tree/5.0.1-post/examples

Также стоит проверить:

Похоже, вам нужно указать api_version, чтобы избежать этой ошибки.Для более подробной информации проверьте здесь .

Версия 1.3.5 этой библиотеки (последняя версия для pypy) перечисляет только определенные версии API 0.8.0 до 0.10.1.Так что, если вы явно не укажете api_version как (0, 10, 1), попытка клиентской библиотеки обнаружить версию приведет к ошибке NoBrokersAvailable.

producer = KafkaProducer(
    bootstrap_servers=URL,
    client_id=CLIENT_ID,
    value_serializer=JsonSerializer.serialize,
    api_version=(0, 10, 1)
)

Это должно работать, что довольно интересно при установке api_versionслучайно решает проблему следующим образом:

Когда вы устанавливаете api_version, клиент не будет пытаться проверять брокеров на наличие информации о версии.Так что это операция зонда, которая терпит неудачу.Одно большое различие между соединениями зондов версии и общими соединениями состоит в том, что первый пытается подключиться только к одному интерфейсу для каждого соединения (для каждого брокера), тогда как последний - общая операция - будет непрерывно циклически проходить через все интерфейсы до тех пор, пока соединение не будет установлено.преуспевает.# 1411 исправляет это, переключая логику проверки версии, чтобы попытаться установить соединение на всех найденных интерфейсах.

Фактическая проблема описана здесь

...