Spring Kafka не может потреблять записи - PullRequest
0 голосов
/ 11 июля 2019

Мы используем Spring Kafka для потребления записей партиями. Иногда мы сталкиваемся с проблемой, при которой приложение запускается, и оно не потребляет никаких записей, даже если непрочитанных сообщений достаточно. Вместо этого мы постоянно видим, что информационные журналы говорят.

[INFO]-[FetchSessionHandler:handleError:440] - [Consumer clientId=consumer-2, groupId=groupId] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1027: org.apache.kafka.common.errors.DisconnectException. 

Люди сталкиваются с этой проблемой, и все говорят, что игнорируют ее, так как это всего лишь информационный журнал. Даже через некоторое время мы видим, что приложение начинает собирать записи, ничего не делая. Но очень непредсказуемо, сколько времени может потребоваться, чтобы начать использовать записи :(

Мы не увидели эту ошибку, когда использовали поток облака Spring. Не уверен, что мы упустили какую-либо конфигурацию в spring-kafka.

Кто-нибудь сталкивался с этой проблемой в прошлом, пожалуйста, сообщите нам, если мы что-то упустили. У нас огромная нагрузка в наших темах, и если есть большая задержка, может ли это произойти?

Мы используем Spring Kafka из 2.2.2. RELEASE Пружинная загрузка 2.1.2.RELEASE Кафка 0.10.0.1 (Мы понимаем, что он очень старый, из-за неизбежных причин, нам приходится использовать это: ()

Вот наш код:

application.yml

li.topics: CUSTOM.TOPIC.JSON
    spring:
      application:
        name: DataPublisher
      kafka:
        listener:
          type: batch
          ack-mode: manual_immediate
        consumer:
          enable-auto-commit: false
          max-poll-records: 500
          fetch-min-size: 1
          fetch-max-wait: 1000
          group-id: group-dev-02
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer:CustomResourceDeserialiser
          auto-offset-reset: earliest

Потребитель:

public class CustomKafkaBatchConsumer {


  @KafkaListener(topics = "#{'${li.topics}'.split(',')}", id = "${spring.kafka.consumer.group-id}")
  public void receiveData(@Payload List<CustomResource> customResources,
      Acknowledgment acknowledgment,
      @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
      @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
}
}

Deserialiser:

public class CustomResourceDeserialiser implements Deserializer<CustomResource> {

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
  }

  @Override
  public CustomResource deserialize(String topic, byte[] data) {
    if (data != null) {
      try {
        ObjectMapper objectMapper = ObjectMapperFactory.getInstance();
        return objectMapper.readValue(data, CustomResource.class);
      } catch (IOException e) {
        log.error("Failed to deserialise with {}",e.getMessage());
      }
    }
    return null;
  }

  @Override
  public void close() {

  }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...