Показатели Кафки не обновляются корректно - PullRequest
0 голосов
/ 31 мая 2019

Проблема

Я слежу за задержкой смещения Кафки, используя метрики, выводимые из конечной точки Spring Actuator Prometheus.В некоторых случаях, когда я выполнял большую работу по обратной засыпке, я заметил, что некоторые из моих узлов (6 экземпляров приложения) перестают обновлять показатель, пока все сообщения не будут использованы.Рассматриваемые узлы по-прежнему правильно обрабатывают сообщения (что подтверждается регистрацией и конечными результатами).При построении графиков моих метрик это проявляется в виде плоской линии в лаге смещения до самого конца, когда они выпадают из края обрыва.

Стоит также отметить, что в тех случаях, когда метрики обновляются в соответствии с ожиданиями, и мы выполняем развертывание приложения, мы можем наблюдать, как метрики переходят от ожидаемых действий к одному или нескольким плоским узлам., как описано выше.

Мне кажется, что это какое-то недетерминированное состояние гонки, возможно, это объясняется тем, как я настроил свое приложение?

1

Context

У меня есть приложение, которое использует несколько тем.Все темы, кроме одной, имеют общий формат сообщения, и только одна из тем имеет другой формат, поскольку это тема «повторных попыток», поэтому сообщение отличается.Я также хотел применить фильтр только к обычным темам, поэтому для облегчения этого я создаю два отдельных компонента ConcurrentKafkaListenerContainerFactory, например:

@Bean
public ConcurrentKafkaListenerContainerFactory<Long, MessageA> messageAContainerFactory(
        ConsumerFactory<Long, MessageA> consumerFactory) { ... }

@Bean
public ConcurrentKafkaListenerContainerFactory<Long, MessageB> messageBContainerFactory(
        ConsumerFactory<Long, MessageB> consumerFactory) { ... }

И затем я ссылаюсь на эти компоненты в соответствующих @KafkaListenerаннотации:

@KafkaListener(
        topics = "message-a-topic",
        containerFactory = "messageAContainerFactory")
public void consumeMessageA(MessageA messageA) { ... }

@KafkaListener(
        topics = "message-B-topic",
        containerFactory = "messageBContainerFactory")
public void consumeMessageB(MessageB messageB) { ... }

Кроме этих бобов в игре больше ничего нет.Конфигурация приложения для Kafka выглядит следующим образом:

spring:
  kafka:
    consumer:
      auto-offset-reset: latest
      max-poll-records: 10
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring:
          deserializer:
            value:
              delegate:
                class: org.springframework.kafka.support.serializer.JsonDeserializer
          json:
            value:
              default:
                type: com.example.MessageA

Я должен подчеркнуть, что я не заметил никаких проблем с функциональностью - сообщения, по-видимому, используются и обрабатываются, как и ожидалось.Это только влияет на способ обновления метрик.

Буду признателен за любой совет!

...