Я использую confluent-kafka-python
и вижу, что он зависает бесконечно, когда я пытаюсь подключиться к брокеру, который не работает. Кажется, я не могу применить настройки тайм-аута, которые я нашел в документации:
from confluent_kafka import Consumer
conf = {'bootstrap.servers': f"{self.host}:{self.port}",
'group.id': "foo",
'auto.offset.reset': 'smallest',
'socket.timeout.ms':'2000', 'socket.max.fails':2,
'metadata.request.timeout.ms': 5000,
'reconnect.backoff.max.ms':'5000',
'api.version.request.timeout.ms':'5000',
#api.version.fallback.ms
'session.timeout.ms':'2000',
#heartbeat.interval.ms
'coordinator.query.interval.ms':'1000',
#max.poll.interval.ms
#auto.commit.interval.ms,
"debug":"generic, broker, topic, metadata",
}
try:
self.consumer = Consumer(conf)
, и я получаю в журнале:
%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: broker in state TRY_CONNECT connecting
%7|1584702589.065|STATE|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1584702589.065|BROADCAST|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: Broadcasting state change
%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Connecting to ipv4#x.x.x.x:6667 (plaintext) with socket 11
%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:app]: Cluster connection already in progress: application metadata request
%7|1584702589.066|CONNECT|rdkafka#consumer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 49ms: application metadata request
%7|1584702589.067|BROKERFAIL|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%7|1584702589.067|FAIL|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Connect to ipv4#x.x.x.x:6667 failed: Connection refused (after 1ms in state CONNECT)
%7|1584702589.067|STATE|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Broker changed state CONNECT -> DOWN
%7|1584702589.067|BROADCAST|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: Broadcasting state change
%7|1584702589.067|STATE|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Broker changed state DOWN -> INIT
, и он просто продолжается и продолжается, и даже Ctrl- C не может остановить его, я должен использовать kill
.
Я ранее использовал kafka-python
, и он потерпел неудачу, когда не смог подключиться к брокеру.
Есть ли настройка, которая может привести к сбою confluent-kafka-python
, когда посредник недоступен?
Я видел этот вопрос Настройка тайм-аута соединения с брокером Kafka Streams и хотел убедиться, что моя ситуация то же самое?
Обновление: Еще одна связанная проблема: если на узле с двойным домом я настраиваю advertized.listeners на внутренний адрес и пытаюсь отправить сообщение с внешнего интерфейса, я снова сталкиваюсь с бесконечным l oop:
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1584727507.340|NODENAME|rdkafka#consumer-1| [thrd:main]: GroupCoordinator: Broker nodename changed from "" to "de-cn1:6667"
%7|1584727507.340|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Received CONNECT op
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.340|BROKERFAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: failed: err: Local: Broker node update: (errno: Success)
%7|1584727507.340|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Closing connection due to nodename change (after 0ms in state TRY_CONNECT)
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> DOWN
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state DOWN -> INIT
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.340|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1001: broker in state TRY_CONNECT connecting
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> CONNECT
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.359|BROKERFAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: failed: err: Local: Host resolution failure: (errno: Bad address)
%7|1584727507.360|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state CONNECT -> DOWN
Таким образом, он не может разрешить внутреннее имя de-cn1
и не дает сбоя. Тот же самый вопрос, как заставить это терпеть неудачу на таких ошибках?