Я пытаюсь запустить приложение flask через docker compose .. наряду с kafka. Я запускаю приложение flask и consumer.py вместе. когда я вызываю файл provider.py через API flask, он, похоже, отправляет данные, а потребитель ничего не получает. Поскольку я одновременно запускаю и приложение flask, и файл потребителя .. Я также не могу просмотреть журналы ... Может ли какая-то помощь по телу ..
мой docker -компонентный файл
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: "zoo1"
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: "kafka1"
ports:
- "9092:9092"
expose:
- "9093"
depends_on:
- zookeeper
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
volumes:
- /var/run/docker.sock:/var/run/docker.sock
backend:
build: .
image: backend:v1
command: bash -c "python3 run.py run && python run.py consumer"
container_name: backend
restart: always
volumes:
- .:/app
ports:
- '5000:5000'
depends_on:
- mongodb
- neodb
- elasticsearch
- kafka
мой продюсер.py
from kafka import KafkaProducer
from json import dumps
class Producer:
def __init__(self):
pass
def producer(self, topic,data):
producer = KafkaProducer(
value_serializer=lambda m: dumps(m).encode('utf-8'),
bootstrap_servers=['kafka:9092','kafka:9093','172.17.0.1:32783','172.17.0.1:32782','172.17.0.1:32781'])
producer.send(topic, data)
мой потребитель.py
from kafka import KafkaConsumer
from json import loads
import os
class Consumer:
def consumer(self,topic):
print("hello")
consumer = KafkaConsumer(
topic,
auto_offset_reset='latest',
enable_auto_commit=True,
group_id='my-group-1',
value_deserializer=lambda m: loads(m.decode('utf-8')),
bootstrap_servers=['kafka:9092','kafka:9093','172.17.0.1:32783','172.17.0.1:32782','172.17.0.1:32781'])
while True:
dirName = "bad"
if not os.path.exists(dirName):
os.mkdir(dirName)
for m in consumer:
pass