Проблема
Я слежу за задержкой смещения Кафки, используя метрики, выводимые из конечной точки Spring Actuator Prometheus.В некоторых случаях, когда я выполнял большую работу по обратной засыпке, я заметил, что некоторые из моих узлов (6 экземпляров приложения) перестают обновлять показатель, пока все сообщения не будут использованы.Рассматриваемые узлы по-прежнему правильно обрабатывают сообщения (что подтверждается регистрацией и конечными результатами).При построении графиков моих метрик это проявляется в виде плоской линии в лаге смещения до самого конца, когда они выпадают из края обрыва.
Стоит также отметить, что в тех случаях, когда метрики обновляются в соответствии с ожиданиями, и мы выполняем развертывание приложения, мы можем наблюдать, как метрики переходят от ожидаемых действий к одному или нескольким плоским узлам., как описано выше.
Мне кажется, что это какое-то недетерминированное состояние гонки, возможно, это объясняется тем, как я настроил свое приложение?
![1](https://i.stack.imgur.com/kE9wv.png)
Context
У меня есть приложение, которое использует несколько тем.Все темы, кроме одной, имеют общий формат сообщения, и только одна из тем имеет другой формат, поскольку это тема «повторных попыток», поэтому сообщение отличается.Я также хотел применить фильтр только к обычным темам, поэтому для облегчения этого я создаю два отдельных компонента ConcurrentKafkaListenerContainerFactory
, например:
@Bean
public ConcurrentKafkaListenerContainerFactory<Long, MessageA> messageAContainerFactory(
ConsumerFactory<Long, MessageA> consumerFactory) { ... }
@Bean
public ConcurrentKafkaListenerContainerFactory<Long, MessageB> messageBContainerFactory(
ConsumerFactory<Long, MessageB> consumerFactory) { ... }
И затем я ссылаюсь на эти компоненты в соответствующих @KafkaListener
аннотации:
@KafkaListener(
topics = "message-a-topic",
containerFactory = "messageAContainerFactory")
public void consumeMessageA(MessageA messageA) { ... }
@KafkaListener(
topics = "message-B-topic",
containerFactory = "messageBContainerFactory")
public void consumeMessageB(MessageB messageB) { ... }
Кроме этих бобов в игре больше ничего нет.Конфигурация приложения для Kafka выглядит следующим образом:
spring:
kafka:
consumer:
auto-offset-reset: latest
max-poll-records: 10
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring:
deserializer:
value:
delegate:
class: org.springframework.kafka.support.serializer.JsonDeserializer
json:
value:
default:
type: com.example.MessageA
Я должен подчеркнуть, что я не заметил никаких проблем с функциональностью - сообщения, по-видимому, используются и обрабатываются, как и ожидалось.Это только влияет на способ обновления метрик.
Буду признателен за любой совет!