Мы используем Spring Kafka для потребления записей партиями. Иногда мы сталкиваемся с проблемой, при которой приложение запускается, и оно не потребляет никаких записей, даже если непрочитанных сообщений достаточно. Вместо этого мы постоянно видим, что информационные журналы говорят.
[INFO]-[FetchSessionHandler:handleError:440] - [Consumer clientId=consumer-2, groupId=groupId] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1027: org.apache.kafka.common.errors.DisconnectException.
Люди сталкиваются с этой проблемой, и все говорят, что игнорируют ее, так как это всего лишь информационный журнал. Даже через некоторое время мы видим, что приложение начинает собирать записи, ничего не делая. Но очень непредсказуемо, сколько времени может потребоваться, чтобы начать использовать записи :(
Мы не увидели эту ошибку, когда использовали поток облака Spring. Не уверен, что мы упустили какую-либо конфигурацию в spring-kafka.
Кто-нибудь сталкивался с этой проблемой в прошлом, пожалуйста, сообщите нам, если мы что-то упустили. У нас огромная нагрузка в наших темах, и если есть большая задержка, может ли это произойти?
Мы используем Spring Kafka из 2.2.2. RELEASE
Пружинная загрузка 2.1.2.RELEASE
Кафка 0.10.0.1 (Мы понимаем, что он очень старый, из-за неизбежных причин, нам приходится использовать это: ()
Вот наш код:
application.yml
li.topics: CUSTOM.TOPIC.JSON
spring:
application:
name: DataPublisher
kafka:
listener:
type: batch
ack-mode: manual_immediate
consumer:
enable-auto-commit: false
max-poll-records: 500
fetch-min-size: 1
fetch-max-wait: 1000
group-id: group-dev-02
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer:CustomResourceDeserialiser
auto-offset-reset: earliest
Потребитель:
public class CustomKafkaBatchConsumer {
@KafkaListener(topics = "#{'${li.topics}'.split(',')}", id = "${spring.kafka.consumer.group-id}")
public void receiveData(@Payload List<CustomResource> customResources,
Acknowledgment acknowledgment,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
}
}
Deserialiser:
public class CustomResourceDeserialiser implements Deserializer<CustomResource> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public CustomResource deserialize(String topic, byte[] data) {
if (data != null) {
try {
ObjectMapper objectMapper = ObjectMapperFactory.getInstance();
return objectMapper.readValue(data, CustomResource.class);
} catch (IOException e) {
log.error("Failed to deserialise with {}",e.getMessage());
}
}
return null;
}
@Override
public void close() {
}
}