кафка с колбой.KafkaTimeoutError: не удалось обновить метаданные через 60 секунд - PullRequest
1 голос
/ 08 июля 2019

Как интегрировать кафку с колбой? Я сделал это, но при отправке сообщения возникла ошибка (producer_instance.send(topic_name, key=key_bytes, value=value_bytes)). Это говорит KafkaTimeoutError: Failed to update metadata after 60.0 secs.. Я публикую свои коды здесь.

    #views.py
    from app import app
    from db.mongo import Database
    from flask import jsonify
    import json
    from app.kafka import Producer

    @app.route('/')
    def index():
        kafka_producer = None
        results = get_data()
        if len(results) > 0:
            kafka_producer = Producer.connect_kafka_producer()
            Producer.publish_message(kafka_producer, 'raw_country_info', 'raw', str(results).strip())
        if kafka_producer is not None:
            kafka_producer.close()
        return json.dumps(results, default=str)

    def get_data():
        database = Database().get_db()
        results = []
        for doc in database["database"].get_collection('country_info').find().limit(10):
            results.append(doc)
        return results

    #Producer.py
    import json
    from time import sleep
    from bs4 import BeautifulSoup
    from kafka import KafkaConsumer, KafkaProducer

    def connect_kafka_producer():
        _producer = None
        try:
            _producer = KafkaProducer(bootstrap_servers=['localhost:9092'], api_version=(0, 10))
        except Exception as ex:
            print('Exception while connecting Kafka')
            print(str(ex))
        finally:
            return _producer

    def publish_message(producer_instance, topic_name, key, value):
        try:
            key_bytes = bytes(key, encoding='utf-8')
            value_bytes = bytes(value, encoding='utf-8')
            print("---------", key_bytes, value_bytes)
            producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
            producer_instance.flush()
            print('Message published successfully.')
        except Exception as ex:
            print('Exception in publishing message')
            print(str(ex))
...