AIOKafka: ранее работающий код завершается с ошибкой в ​​send_and_wait - PullRequest
0 голосов
/ 08 мая 2018

Я уже некоторое время пользуюсь AIOKafka, с этим у меня не было проблем, до сегодняшнего дня.

Странно TypeError появляется, когда я пытаюсь отправить сообщение, используя AIOKafkaProducer.send_and_wait. Я также разместил этот вопрос как проблему в репозитории AIOKafka github, но похоже, что они неактивны. Может быть, здесь кто-то может мне помочь.

вот код:

import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer

loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers="localhost:9092")

async def _initialize(prod, future):
    await prod.start()
    await prod.send_and_wait("main_topic", str.encode("hello!!"))

future = asyncio.Future()
task = asyncio.ensure_future(_initialize(producer, future))
loop.run_until_complete(task)
print("loop ended!")
loop.close()

вот сообщение об ошибке, которое я получаю:

yilmazali@yilmazali:~$ python3 aiokafkatest.py
Unexpected error in sender routine
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/aiokafka/producer/producer.py", line 374, in _sender_routine
    task.result()
  File "/usr/local/lib/python3.6/dist-packages/aiokafka/producer/producer.py", line 418, in _send_produce_req
    response = yield from self.client.send(node_id, request)
  File "/usr/local/lib/python3.6/dist-packages/aiokafka/client.py", line 415, in send
    request, expect_response=expect_response)
  File "/usr/local/lib/python3.6/dist-packages/aiokafka/conn.py", line 165, in send
    message = header.encode() + request.encode()
  File "/usr/local/lib/python3.6/dist-packages/kafka/util.py", line 159, in __call__
    return self.method()(self.target(), *args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/struct.py", line 42, in _encode_self
    [self.__dict__[name] for name in self.SCHEMA.names]
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in encode
    for i, field in enumerate(self.fields)
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in <listcomp>
    for i, field in enumerate(self.fields)
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 170, in encode
    [self.array_of.encode(item) for item in items]
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 170, in <listcomp>
    [self.array_of.encode(item) for item in items]
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in encode
    for i, field in enumerate(self.fields)
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in <listcomp>
    for i, field in enumerate(self.fields)
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 170, in encode
    [self.array_of.encode(item) for item in items]
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 170, in <listcomp>
    [self.array_of.encode(item) for item in items]
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in encode
    for i, field in enumerate(self.fields)
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 132, in <listcomp>
    for i, field in enumerate(self.fields)
  File "/usr/local/lib/python3.6/dist-packages/kafka/protocol/types.py", line 93, in encode
    return Int32.encode(len(value)) + value
TypeError: object of type '_io.BytesIO' has no len()

Я не внес изменений в структуру или библиотеки Кафки. Мой брокер кафки выглядит хорошо. Я могу создавать / потреблять сообщения с помощью сценариев оболочки.

У меня не было проблем с AIOKafka в течение последних 2-3 месяцев, приведенный выше код работал нормально. Из ниоткуда эта ошибка появилась, и мне интересно, в чем проблема.

Любая помощь будет оценена.

С наилучшими пожеланиями,

Али

-

обновление: мы запустили этот сегмент кода на компьютере друга, он работал нормально. Я объявил мою кафку снаружи, и она успешно написала в мою местную тему кафки с кодом выше. Версии библиотеки AIOKafka - 0.4.0 на обеих машинах. Также asyncio версии 3.4.3 на обеих машинах. Короче, проблема не в моей кафке или библиотеках. Что-то не так с моей машиной, но бог знает, что конкретно вызывает это.

Ответы [ 2 ]

0 голосов
/ 24 мая 2018

Может быть, вы обновили kafka-python до версии выше 1.3.5. Я так и сделал, и айокафка начала выходить из строя. Я вернулся на kafka-python 1.3.5, и, кажется, все в порядке

0 голосов
/ 08 мая 2018

Наконец-то сработало на моей машине. Я просто удалил и установил aiokafka модуль.

Хотя я не удовлетворен этим решением и хотел бы углубиться в суть проблемы, я рад, что могу продолжать выполнять свою работу сейчас.

Надеюсь, это поможет незнакомцам с такой же проблемой.

...