Метаданные PyKafka в байтах вместо строк - PullRequest
0 голосов
/ 11 мая 2018

Я вижу необычное поведение с PyKafka, клиентом, который я только недавно начал использовать.

Ошибка следующая:

Failed to connect newly created broker for b'4758e4ee1af6':9092
{0: <pykafka.broker.Broker at 0x7f319e19be10 (host=b'4758e4ee1af6',port=9092, id=0)>}

Источник ошибки в следующих строках:

    self.client = KafkaClient(hosts=BROKER_ADDRESS, broker_version="0.10.1.0")
consumer = self.client.topics[bytes(self.input_topic,"UTF-8")].get_balanced_consumer(
        consumer_group=bytes(self.consumer_group,"UTF-8"),
        auto_commit_enable=True
    )

Отладка. Я видел, что клиент использует правильный IP-адрес строки для подключения к начальному брокеру, но при получении списка брокеров их IP-адреса являются двоичными, и когда PyKafka пытается подключиться снова, чтобы создать потребителя, эти IP-адреса явно не не работает.

Другая проблема, возможно связанная с этим, заключается в том, что мне нужно самостоятельно преобразовывать имена тем и групп потребителей в байты (как и в случае с другими клиентами), но все примеры в документации показывают использование строк.

Версия брокера Kafka: 0.10.1.0 Версия PyKafka: 2.7.0

Ответы [ 2 ]

0 голосов
/ 15 мая 2018

Проверьте конфигурацию advertised.listeners вашего брокера - она ​​определяет имена хостов, которые будут отправлены ZooKeeper и далее клиентам pykafka во время инициализации Cluster pykafka. Возможно, Docker повреждает эту информацию, поэтому вы можете переопределить ее, используя advertised.listeners. Из документации :

Прослушиватели для публикации в ZooKeeper для использования клиентами, если они отличаются от свойства конфигурации listeners. В средах IaaS это может отличаться от интерфейса, с которым связывается брокер.

Что касается проблемы с байтами / строками, последняя версия разработки pykafka принимает строки или байтов для имен тем и групп потребителей в качестве удобства программиста. Для более старых версий вам необходимо преобразовать строковые аргументы в байты, используя следующую технику:

topic_name = str_topic_name.encode('ascii')
0 голосов
/ 14 мая 2018

Хорошо, я был полностью введен в заблуждение: это был не IP, а имя хоста в base64 (генерируется Docker).

...