Kafka Java Consumer висит на Jenkins и не читает сообщения на локальном - PullRequest
0 голосов
/ 30 мая 2018

Я создал потребителя, используя следующую зависимость

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>

Ниже приведен код для потребителя:

    private static String TopicName = "Automation_kafka_test";
    LOGGER.info("Initializing the consumer");
    KafkaConsumer<String, String> myKafkaCascadeConsumer = new KafkaConsumer<String, String>(KafkaCascadeConsumer.kafkaCascadeConfiguration());
    for (Map.Entry<String, Object> entry : KafkaCascadeConsumer.kafkaCascadeConfiguration().entrySet()) 
    {
     LOGGER.info("Key = "+entry.getKey() + ", Value =" + entry.getValue());
    }

 KafkaConsumerHelper.readKafkaMessages(myKafkaCascadeConsumer, TopicName);
        myKafkaCascadeConsumer.close();


// read Kafka messages
        public static void readKafkaMessages(KafkaConsumer<String, String> myKafkaConsumer, String topicName) {
            LOGGER.info("Subscribing to Topic =" + topicName);
            myKafkaConsumer.subscribe(Arrays.asList(topicName));
                 while (true) {
                     ConsumerRecords<String, String> records = myKafkaConsumer.poll(100);
                     for (ConsumerRecord<String, String> record : records)
                         System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
                 }

        }

Ниже выводится:

2018-05-30 14:23:21,247  INFO [TestNG-test=Test-1] (US000000_KafkaTest.java:81) - Initializing the consumer
2018-05-30 14:23:21,869  INFO [TestNG-test=Test-1] (US000000_KafkaTest.java:87) - Key = key.deserializer, Value =org.apache.kafka.common.serialization.StringDeserializer
2018-05-30 14:23:21,869  INFO [TestNG-test=Test-1] (US000000_KafkaTest.java:87) - Key = value.deserializer, Value =org.apache.kafka.common.serialization.StringDeserializer
2018-05-30 14:23:21,869  INFO [TestNG-test=Test-1] (US000000_KafkaTest.java:87) - Key = enable.auto.commit, Value =false
2018-05-30 14:23:21,869  INFO [TestNG-test=Test-1] (US000000_KafkaTest.java:87) - Key = group.id, Value =AutomationRamtest1
2018-05-30 14:23:21,870  INFO [TestNG-test=Test-1] (US000000_KafkaTest.java:87) - Key = consumer.timeout.ms, Value =50000
2018-05-30 14:23:21,871  INFO [TestNG-test=Test-1] (US000000_KafkaTest.java:87) - Key = bootstrap.servers, Value =ABsrd00xxx:9092,ABsrd00yyy:9092 ***** masked for privacy***
2018-05-30 14:23:21,871  INFO [TestNG-test=Test-1] (US000000_KafkaTest.java:87) - Key = auto.commit.interval.ms, Value =1000
2018-05-30 14:23:21,871  INFO [TestNG-test=Test-1] (US000000_KafkaTest.java:87) - Key = auto.offset.reset, Value =earliest
2018-05-30 14:23:21,887  INFO [TestNG-test=Test-1] (KafkaConsumerHelper.java:53) - Subscribing to Topic =Automation_kafka_test

Дженкинс застрялна вышеупомянутое утверждение, и никогда не выходить из негоТакже этот код никогда не возвращает никаких сообщений на мой локальный компьютер, но при использовании CLI разработчик получает сообщение обратно из той же темы, которую я читаю.

Также я использую ту же настройку bootstrap.servers для производителя, и она работает.

Не могли бы вы сообщить мне, если я делаю что-то не так.

1 Ответ

0 голосов
/ 15 июня 2018

На самом деле проблема была с групповым назначением.Если существует только один раздел, и если какой-либо предыдущий потребитель не был завершен должным образом, то Kafka ожидает максимально длинное значение, чтобы прекратить его.В промежутке между тем, если вы попытаетесь добавить потребителя в ту же группу, он не будет читать.

Ниже в статье объясняется связь между группой, разделом и темой.

https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html

...