SchemaException - Ошибка чтения поля 'leader_id': длина строки -1 не может быть отрицательной - концентраторы событий Azure - Kafka - PullRequest
0 голосов
/ 17 февраля 2019

Я получаю это исключение

org.apache.kafka.common.protocol.types.SchemaException

, пока кафка выполняет повторную балансировку

Вот подробности:

  1. Использование события Azureконцентраторы.Доступ к нему с помощью API kafka

  2. «Kafka Enabled» = да, на лазурном портале

  3. с использованием: compile group: 'org.apache.kafka ', имя:' kafka-clients ', версия:' 1.0.2 '

  4. Использование группы потребителей

  5. Properties properties = new Properties();
    properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    properties.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
    properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.format("%s.servicebus.windows.net:9093", this.namespace));
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MeasurementDeSerializer.class.getName());
    properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupName);
    properties.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
    

У меня 2 клиента на 2 разных компьютерах

Когда они оба работают, каждый получает 16 разделов из доступных 32 разделов.

Когда я выключаю один из них, всечасти перебалансированы для другой.

на экземпляре, который все еще работает, я получу:

  1. Разделы отменены [16, 17, 18, 19,20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]

  2. Затем из цикла пула я получу это исключение:

    org.apache.kafka.common.protocol.types.SchemaException: Ошибка чтения поля 'leader_id': длина строки -1 не может быть отрицательной в org.apache.kafka.common.protocol.types.Schema.read (схема.java: 76) на org.apache.kafka.common.protocol.ApiKeys.parseResponse (ApiKeys.java:279) в org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics (NetworkClient.java:586) в org.apache.kafslij.lijliClientClientClientClientClientClientClientClientв org.apache.kafka.clients.NetworkClient.poll (NetworkClient.java:469) в org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll (ConsumerNetworkClient.java:258) в org.apache.kaf,С(ConsumerCoordinator.java:295) в org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce (KafkaConsumer.java:1146) в org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1111)

С другой стороны,при переходе в другую сторону проблем не возникает

  1. Запустите первый экземпляр

  2. Экземпляр 1 получит все 32 раздела

  3. Запуск экземпляра 2

  4. Запуск повторной балансировки

  5. Экземпляр 1 теряет 16 частей

  6. экземпляр 2 получает 16 частей

Есть идеи, что может вызвать это исключение?

...