Невозможно подключиться к узлу с идентификатором 1: [Работник]: Ошибка: ConnectionError («Нет подключения к узлу с идентификатором») - PullRequest
2 голосов
/ 25 января 2020

Я пытаюсь использовать robinhood / faust, но безуспешно!

Я уже создал продюсер, который успешно вставляет в исходный topi c в мой экземпляр localhost confluent-kafka!

но faust не может подключиться к localhost.

Мой app.py:

import faust
import base64
import random
from datetime import datetime


SOURCE_TOPIC="input_msgs"
TARGET_TOPIC="output_msgs"

app = faust.App("messages-stream", 
    broker="kafka://"+'localhost:9092',
    topic_partitions=1,
    store="memory://")

class OriginalMessage(faust.Record):
    msg: str


class TransformedMessage(faust.Record):
    msg_id: int
    msg_data: str
    msg_base64: str
    created_at: float 
    source_topic: str
    target_topic: str
    deleted: bool

topic = app.topic(SOURCE_TOPIC, value_type=OriginalMessage)
out_topic = app.topic(TARGET_TOPIC, partitions=1)

table = app.Table(
    "output_msgs",
    default=TransformedMessage,
    partitions=1,
    changelog_topic=out_topic,
)

print('Initializing Thread Processor...')


@app.agent(topic)
async def transformedmessage(messageevents):
    async for transfmessage in messageevents:
        try:

            table[transfmessage.msg_id] = random.randint(1, 999999)
            table[transfmessage.msg_data] = transfmessage.msg
            table[transfmessage.msg_base64] = base64.b64encode(transfmessage.msg)
            table[transfmessage.created_at] = datetime.now().isoformat()
            table[transfmessage.source_topic] = SOURCE_TOPIC
            table[transfmessage.target_topic] = TARGET_TOPIC
            table[transfmessage.deleted] = False

        except Exception as e:
            print(f"Error: {e}")


if __name__ == "__main__":
    app.main()

Ошибка

[2020-01-24 18:05:36,910] [55712] [ERROR] Unable connect to node with id 1: [Errno 8] nodename nor servname provided, or not known 
[2020-01-24 18:05:36,910] [55712] [ERROR] [^Worker]: Error: ConnectionError('No connection to node with id 1') 

    "No connection to node with id {}".format(node_id))
kafka.errors.ConnectionError: ConnectionError: No connection to node with id 1

I работаю с: faust -A app рабочий -l отладка

1 Ответ

0 голосов
/ 04 мая 2020

Я столкнулся с этой ошибкой, и, к счастью, я несколько ожидал ее, чтобы не было слишком сложно выяснить проблему.

В Confluent необходимо настроить домен, который должен использоваться для достижения всех брокеры Kafka, которые загружаются для вас. Я действительно не знал, насколько важным будет домен, поэтому я просто добавил что-то случайное, пока не застрял.

Конечно, я застрял здесь, как и вы, поэтому я запустил Wireshark, чтобы посмотреть, что происходило между Фаустом и bootstrap сервером. Оказывается, bootstrap разговор идет примерно так:

..............faust-1.10.4..PLAIN <-- client name
................PLAIN             <-- authentication protocol
....foobar.foobar.foobar2000.     <-- credentials!
b0.svs.cluster.local..#.......b1.svs.cluster.local..# <-- individual Kafka brokers

Они следуют шаблону доменных имен, которые я выбрал, и которые описаны в документации Confluent: https://docs.confluent.io/current/installation/operator/co-endpoints.html

Если эти имена не разрешаются, вы получаете здесь неопределенную ошибку, потому что, несмотря на успешную загрузку, клиент Kafka, следовательно, не смог подключиться к конечным точкам. Итак, выберите домен, которым вы фактически управляете, или поместите нужные ответы в локальный /etc/hosts или эквивалентный файл.

17  123.111.222.1 b0.svs.cluster.local
18  123.111.222.2 b1.svs.cluster.local

После перезапуска Faust соединение bootstrap и Kafka установилось успешно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...