Ошибка при получении метаданных с идентификатором корреляции 92: {myTest = UNKNOWN_TOPIC_OR_PARTITION} - PullRequest
0 голосов
/ 14 апреля 2020

Я создал пример приложения для проверки кода моего производителя. Мое приложение работает нормально, когда я отправляю данные без ключа разделения. Но при указании ключа для разделения данных я получаю сообщение об ошибке:

[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 37 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 38 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 39 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}

как для потребителя, так и для производителя. Я много искал на inte rnet, они предложили проверить настройки kafka.acl. Я использую kafka в HDInsight , и я не знаю, как проверить это и решить эту проблему.

Мой кластер имеет следующую конфигурацию:

  1. Head Node : 2
  2. Рабочий узел: 4
  3. Zookeeper: 3

МОЙ код производителя:

public static void produce(String brokers, String topicName) throws IOException{

    // Set properties used to configure the producer
    Properties properties = new Properties();
      // Set the brokers (bootstrap servers)
    properties.setProperty("bootstrap.servers", brokers);
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    // specify the protocol for Domain Joined clusters

    //To create an Idempotent Producer
    properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
    properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
    properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id"); 
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    producer.initTransactions();
    // So we can generate random sentences
    Random random = new Random();
    String[] sentences = new String[] {
            "the cow jumped over the moon",
            "an apple a day keeps the doctor away",
            "four score and seven years ago",
            "snow white and the seven dwarfs",
            "i am at two with nature",
         };


    for(String sentence: sentences){
        // Send the sentence to the test topic
        try
        {
            String key=sentence.substring(0,2);
            producer.beginTransaction();
            producer.send(new ProducerRecord<String, String>(topicName,key,sentence)).get();
        }
        catch (Exception ex)
        {
          System.out.print(ex.getMessage());
            throw new IOException(ex.toString());
        }
        producer.commitTransaction();
    }
}

Также, My topi c состоит из 3 разделов с коэффициентом репликации = 3

Ответы [ 2 ]

0 голосов
/ 15 апреля 2020

Я сделал коэффициент репликации меньше, чем количество разделов, и это сработало для меня. Это звучит странно для меня, но да, оно начало работать после этого.

0 голосов
/ 15 апреля 2020

Ошибка ясно указывает на то, что созданный вами топи c (или раздел) не существует.

В конечном итоге вам нужно будет описать topi c (через CLI kafka-topics --describe --topic <topicName> или другими способами), чтобы проверить, верно ли это

Кафка в HDInsight, и у меня нет Идея, как проверить это и решить эту проблему.

ACL устанавливаются только в том случае, если вы установили кластер вместе с ними, но я считаю, что вы все еще можете перечислить ACL через zookeper-shell или SSHing в один из Had oop мастеров.

...