Не удается получить доступ к сообщениям от конфлюентной кафки на EC2 - PullRequest
0 голосов
/ 11 октября 2018

Confluent Kafka 5.0.0 был установлен на AWS EC2 с публичным IP, скажем, 54.XX.XX.XX Открыт порт 9092 на компьютере EC2 с 0.0.0.0

В / etc / kafka / server.properties У меня есть

advertised.listeners=PLAINTEXT://54.XX.XX.XX:9092  
listeners=PLAINTEXT://0.0.0.0:9092

В /etc/kafka/producer.properties У меня bootstrap.servers=0.0.0.0:9092

на локальной машине В /etc/kafka/consumer.properties У меня bootstrap.servers=54.XX.XX.XX:9092

В EC2,запустил kafka 'confluent start' и создал 'mytopic'

Мой код файла временной базы производителя пробы выглядит как (соответствующая часть):

from confluent_kafka import Producer
broker = '54.XX.XX.XX'
topic = 'mytopic'
    p = Producer({'bootstrap.servers': broker})

    for data in dictList:
        p.poll(0)
        sendme = json.dumps(data)
        p.produce(topic, sendme.encode('utf-8'), callback=delivery_report)

    p.flush()

Похоже, что сообщения пишутся в 'mytopic'в потоке кафки в EC2.Я вижу эти сообщения в 'kafkacat -b 54.XX.XX.XX -t mytopic' на EC2.

Но я не могу получить доступ к этим сообщениям с локального компьютера как простой пользователь печати сообщений,с кодом, как показано ниже:

from confluent_kafka import Consumer, KafkaError, KafkaException
import json
import sys

broker = '54.XX.XX.XX'
topic = 'mytopic'
group = 'mygroup'
     c = Consumer({
         'bootstrap.servers': broker,
         'group.id': group,
         'session.timeout.ms': 6000,
         'default.topic.config': {
             'auto.offset.reset': 'smallest'
         }
     })
     basic_consume_loop(c,[topic])

def basic_consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)

        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('{} [{}] reached end at offset {}\n'.format(msg.topic(), msg.partition(), msg.offset()))
                    data_process()
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                msg_process(msg)
    finally:
        # Close down consumer to commit final offsets.
        print("Shutting down the consumer")
        consumer.close()

Он просто зависает, я пропустил какие-либо настройки?

1 Ответ

0 голосов
/ 15 октября 2018

Кажется, что следующие шаги работают.

На локальном компьютере и на компьютере EC2 в /etc/kakfa/server.properties установите

listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://54.XX.XX.XX:9092

На локальном компьютере в / etc /kakfa / seller.properties set

bootstrap.servers=0.0.0.0:9092

на машине EC2, в /etc/kakfa/producer.properties set

bootstrap.servers=localhost:9092

как на локальной машине, так и на машине EC2, в / etc / kakfa/consumer.properties set

bootstrap.servers=0.0.0.0:9092
group.id=mygroup

Используйте 'confluent-start', чтобы запустить все необходимые демоны на удаленной машине EC2.На локальной машине Confluent НЕ работает.

На локальном компьютере (для скрытия IP, необязательно):

export KAFKA_PRODUCER_IP=54.XX.XX.XX

При этом производитель с локального компьютера может отправлять сообщения на удаленный компьютер EC2 Kafka следующим образом:

broker = os.environ['KAFKA_PRODUCER_IP'] + ':9092'
topic = 'mytopic'
p = Producer({'bootstrap.servers': broker})

С локального компьютера сообщения могут быть получены с удаленной кафки EC2 следующим образом:

broker = os.environ['KAFKA_PRODUCER_IP'] + ':9092'
topic = 'mytopic'
group = 'mygroup'
     c = Consumer({
         'bootstrap.servers': broker,
         'group.id': group,
         'session.timeout.ms': 6000,
         'default.topic.config': {
             'auto.offset.reset': 'smallest'
         }
     })

Эти шаги, кажется, работают.Там могут быть некоторые увольнения, если это так, обязательно укажите.

...