Аварийное переключение кластера Kafka заставляет поток пульса клиента потребителя закрывать контекст приложения Spring - PullRequest
0 голосов
/ 17 апреля 2020

У нас есть многокластерная настройка kafka (prod / DR) с репликацией кластера и мы используем сервис проверки работоспособности для динамического определения работоспособности кластера как во время запуска, так и динамически во время нормальной работы приложения. Клиентские приложения выполняются как весенняя загрузка + java 11, но я видел то же поведение, что и обычное старое весеннее приложение.

В приложении, если мы обнаружим какие-либо исключения типа RetriableException на публике производителя sh или опросе потребителей, мы динамически воссоздадим производителя / потребителя после выполнения проверки работоспособности, чтобы получить подробную информацию для самый здоровый кластер.

Однако я вижу следующую проблему, когда убиваю всех брокеров для кластера, к которому мы сейчас подключены (например, имитация полного отказа центра обработки данных для нашего кластера Prod).

Производители: После 60-секундной задержки исключение (kafka TimeoutException) выходит из источника, и наше приложение автоматически переключается при сбое и подключается к кластеру DR. Сообщения в полете, которые мы отправляли в кластер Prod во время сбоя, доставляются в порядке после повторного подключения.

Потребители: Любые приложения с потребителями страдают фатальными проблемами. Поток пульса потребителя не может подключиться ни к одному из брокеров в кластере, к которому мы были подключены (ожидается, так как я убил всех брокеров). Он повторяет попытку l oop в течение 60 секунд (так же, как и производитель), однако после истечения 60 секунд, никакие исключения не прерывают вызов опроса. Вместо этого похоже, что либо код kafka либо активно закрывает контекст весеннего приложения, либо уничтожает поток, который приводит к тому же результату, и наше приложение умирает! См. Трассировку стека ниже, где последняя строка - закрытие контекста пружины.

16:59:53.952 [Thread-2] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX, groupId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX] Group coordinator XXXXXXXXXXXXXXXXXXXXXXXXXXXX (id: XXXXXXXXXX rack: null) is unavailable or invalid, will attempt rediscovery
16:59:54.127 [Thread-2] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX, groupId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX] Discovered group coordinator XXXXXXXXXXXXXXXXXXXXXXXXXXXX (id: XXXXXXXXXX rack: null)
16:59:54.482 [Thread-2] WARN  o.a.k.c.NetworkClient - [Consumer clientId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX, groupId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX] Connection to node XXXXXXXXXX terminated during authentication. This may indicate that authentication failed due to invalid credentials.
16:59:54.482 [kafka-coordinator-heartbeat-thread | XXXXXXXXXXXXXXXXXXXXXXXXXXXX] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX, groupId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX] Group coordinator XXXXXXXXXXXXXXXXXXXXXXXXXXXX (id: XXXXXXXXXX rack: null) is unavailable or invalid, will attempt rediscovery
16:59:54.582 [Thread-2] WARN  o.a.k.c.NetworkClient - [Consumer clientId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX, groupId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX] Connection to node 4 could not be established. Broker may not be available.
16:59:55.797 [Thread-2] WARN  o.a.k.c.NetworkClient - [Consumer clientId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX, groupId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX] Connection to node 2 could not be established. Broker may not be available.
16:59:57.017 [Thread-2] WARN  o.a.k.c.NetworkClient - [Consumer clientId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX, groupId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX] Connection to node 5 could not be established. Broker may not be available.
16:59:58.249 [Thread-2] WARN  o.a.k.c.NetworkClient - [Consumer clientId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX, groupId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX] Connection to node 2 could not be established. Broker may not be available.
16:59:59.461 [Thread-2] WARN  o.a.k.c.NetworkClient - [Consumer clientId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX, groupId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX] Connection to node 5 could not be established. Broker may not be available.
17:00:00.674 [Thread-2] WARN  o.a.k.c.NetworkClient - [Consumer clientId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX, groupId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX] Connection to node 2 could not be established. Broker may not be available.
17:00:01.888 [Thread-2] WARN  o.a.k.c.NetworkClient - [Consumer clientId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX, groupId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX] Connection to node 5 could not be established. Broker may not be available.
17:00:03.102 [Thread-2] WARN  o.a.k.c.NetworkClient - [Consumer clientId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX, groupId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX] Connection to node 2 could not be established. Broker may not be available.
17:00:04.315 [Thread-2] WARN  o.a.k.c.NetworkClient - [Consumer clientId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX, groupId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX] Connection to node 5 could not be established. Broker may not be available.
17:00:05.528 [Thread-2] WARN  o.a.k.c.NetworkClient - [Consumer clientId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX, groupId=XXXXXXXXXXXXXXXXXXXXXXXXXXXX] Connection to node 2 could not be established. Broker may not be available.
Exception in thread "Thread-2" org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing the current consumed offsets
17:00:05.630 [Thread-0] INFO  o.s.c.a.AnnotationConfigApplicationContext - Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@2ef5e5e3: startup date [Fri Apr 17 16:58:22 BST 2020]; root of context hierarchy

Я еще не копался в коде kafka, чтобы точно понять, почему он это делает, но мне это кажется очень странным. Почему это поведение по умолчанию, при котором вы не можете подключиться ни к одному брокеру в кластере?

В конечном счете, это не сильно повлияет на нас, так как мы работаем в Docker экземпляре в Kubernetes, поэтому Закрытие контекста приложения приводит к смерти нашего модуля, и мы подключаемся к исправному кластеру аварийного восстановления при появлении нового модуля. Тем не менее, мне интересно, если кто-нибудь может прокомментировать это поведение - это намеренно?

...