Я создал потребителя, используя следующую зависимость
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
Ниже приведен код для потребителя:
private static String TopicName = "Automation_kafka_test";
LOGGER.info("Initializing the consumer");
KafkaConsumer<String, String> myKafkaCascadeConsumer = new KafkaConsumer<String, String>(KafkaCascadeConsumer.kafkaCascadeConfiguration());
for (Map.Entry<String, Object> entry : KafkaCascadeConsumer.kafkaCascadeConfiguration().entrySet())
{
LOGGER.info("Key = "+entry.getKey() + ", Value =" + entry.getValue());
}
KafkaConsumerHelper.readKafkaMessages(myKafkaCascadeConsumer, TopicName);
myKafkaCascadeConsumer.close();
// read Kafka messages
public static void readKafkaMessages(KafkaConsumer<String, String> myKafkaConsumer, String topicName) {
LOGGER.info("Subscribing to Topic =" + topicName);
myKafkaConsumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords<String, String> records = myKafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
}
Ниже выводится:
2018-05-30 14:23:21,247 INFO [TestNG-test=Test-1] (US000000_KafkaTest.java:81) - Initializing the consumer
2018-05-30 14:23:21,869 INFO [TestNG-test=Test-1] (US000000_KafkaTest.java:87) - Key = key.deserializer, Value =org.apache.kafka.common.serialization.StringDeserializer
2018-05-30 14:23:21,869 INFO [TestNG-test=Test-1] (US000000_KafkaTest.java:87) - Key = value.deserializer, Value =org.apache.kafka.common.serialization.StringDeserializer
2018-05-30 14:23:21,869 INFO [TestNG-test=Test-1] (US000000_KafkaTest.java:87) - Key = enable.auto.commit, Value =false
2018-05-30 14:23:21,869 INFO [TestNG-test=Test-1] (US000000_KafkaTest.java:87) - Key = group.id, Value =AutomationRamtest1
2018-05-30 14:23:21,870 INFO [TestNG-test=Test-1] (US000000_KafkaTest.java:87) - Key = consumer.timeout.ms, Value =50000
2018-05-30 14:23:21,871 INFO [TestNG-test=Test-1] (US000000_KafkaTest.java:87) - Key = bootstrap.servers, Value =ABsrd00xxx:9092,ABsrd00yyy:9092 ***** masked for privacy***
2018-05-30 14:23:21,871 INFO [TestNG-test=Test-1] (US000000_KafkaTest.java:87) - Key = auto.commit.interval.ms, Value =1000
2018-05-30 14:23:21,871 INFO [TestNG-test=Test-1] (US000000_KafkaTest.java:87) - Key = auto.offset.reset, Value =earliest
2018-05-30 14:23:21,887 INFO [TestNG-test=Test-1] (KafkaConsumerHelper.java:53) - Subscribing to Topic =Automation_kafka_test
Дженкинс застрялна вышеупомянутое утверждение, и никогда не выходить из негоТакже этот код никогда не возвращает никаких сообщений на мой локальный компьютер, но при использовании CLI разработчик получает сообщение обратно из той же темы, которую я читаю.
Также я использую ту же настройку bootstrap.servers для производителя, и она работает.
Не могли бы вы сообщить мне, если я делаю что-то не так.