Я настроил несколько потребителей Kafka с помощью Spring Kafka, которые работают нормально. Однако, случайно, я только что обнаружил, что, когда один из потребителей выходит из строя, я не вижу каких-либо исключений в журнале ошибок. Я вижу только следующее в catalina.out:
2019-04-13 02:07:37.008 INFO 17875 --- [alconDataRiver1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-6, groupId=dataRiver1] Group coordinator xxx-xx2x1-xxx-xxx.xxxxxx.com:38900 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
......
2019-04-13 02:07:38.014 INFO 17875 --- [ntainer#2-0-C-1] o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-2, groupId=dataRiver1] Error sending fetch request (sessionId=1593042214, epoch=92705) to node 1: org.apache.kafka.common.errors.DisconnectException.
Я не знаю, почему в этом случае не создаются исключения.
Я использую Spring Boot для своего проекта.
Вот как я настроил потребителя:
@Bean
public ConsumerFactory<String, GenericData.Record> fasConsumerFactoryFirst() {
Map<String, Object> dataRiverProps = getFasDataRiverProps();
dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("fas1.bootstrap.servers"));
dataRiverProps.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, env.getProperty("fas1.schema.registry.url"));
return new DefaultKafkaConsumerFactory<>(dataRiverProps);
}
@KafkaListener(topics = "#{'${" + PROP_KAFKA_FAS_TOPICS + "}'.split(',')}", containerFactory = "fasKafkaListenerContainerFactoryFirst")
public void consumeAvroFirst(List<Message<GenericData.Record>> list) {
consumeJsonMessageBatch(convertAvroToJsonBatch(list), KAFKA_CONSUMER_FAS_1);
}
Интересно, есть ли какие-либо конфигурации, которые я пропустил, которые могли бы помочь генерировать исключения (или, возможно, предупреждения) в таком случае, которые будут записаны в журналы ошибок, чтобы я мог легко отслеживать. Спасибо!