Невозможно отправлять сообщения Топи c в Кафке Python - PullRequest
0 голосов
/ 16 июня 2020

У меня есть код производителя, по которому я отправляю сообщения Kafka. До вчерашнего дня я мог отправлять сообщения. С сегодняшнего дня я не могу отправлять сообщения. Не уверен, что проблема с его версией совместима. Никаких сбоев или сообщений об ошибках, код выполняется, но не отправляет сообщения.

Ниже приведены версии модуля python,
kafka-python==2.0.1
Python 3.8.2

Ниже мой код:

from kafka import KafkaProducer
import logging
logging.basicConfig(level=logging.INFO)

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
producer.send('Jim_Topic', b'Message from PyCharm')
producer.send('Jim_Topic', key=b'message-two', value=b'This is Kafka-Python')

Любые предложения были бы полезны.

Я тоже пытаюсь регистрировать поведение, но не знаю, почему производитель закрывается,

INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to 127.0.0.1:9092 [('127.0.0.1', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.
INFO:kafka.producer.kafka:Closing the Kafka producer with 0 secs timeout.
INFO:kafka.producer.kafka:Proceeding to force close the producer since pending requests could not be completed within timeout 0.
INFO:kafka.producer.kafka:Kafka producer closed

Process finished with exit code 0

Ответы [ 2 ]

0 голосов
/ 16 июня 2020

Добавление producer.flush() в конце помогло мне решить проблемы. Все незавершенные сообщения будут сброшены (доставлены) перед фактической фиксацией транзакции

from kafka import KafkaProducer
import logging
logging.basicConfig(level=logging.INFO)

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
producer.send('Jim_Topic', b'Message from PyCharm')
producer.send('Jim_Topic', key=b'message-two', value=b'This is Kafka-Python')
producer.flush()
0 голосов
/ 16 июня 2020

Можете ли вы попробовать следующий код. Я взял его из документации kafka- python и попробовал на моем локальном экземпляре Kafka

from kafka import KafkaProducer
import json
def on_send_success(record_metadata):
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)


def on_send_error(excp):
    log.error('I am an errback', exc_info=excp)


if __name__ == "__main__":
    target_topic = "Jim_Topic"
    producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'],retries=5, key_serializer=lambda x: json.dumps(x).encode("ascii"), value_serializer=lambda x: json.dumps(x).encode("ascii"))
    messages = [{key:"message_one",value:"Message from PyCharm"},{key:"message_two",value:"This is Kafka-Python"}]
    for msg in messages:
      producer.send(target_topic,msg).add_callback(on_send_success).add_errback(on_send_error)

    producer.flush(timeout=10) # this forcibly sends any messages that are stuck.
    producer.close(timeout=5)
...