У меня проблемы с привлечением к работе самого простого потребителя кафки.Я использую kafka-clients-1.1.0.jar
Вот все, что я сделал.
- Запущен zookeeper в командной строке (все команды запускаются из)
zookeeper-server-start.bat ../../config/zookeeper.properties
Запущен сервер Kafka
kafka-server-start.bat ../../config/server.properties
Создал новую тему 'hellotopic' и подтвердил ее, перечислив темы
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hellotopic
Created topic "hellotopic".
Подтвердите, перечислив темы
D:\RC\Softwares\kafka_2.12-1.1.0\kafka_2.12-1.1.0\bin\windows>kafka-topics.bat --list --zookeeper localhost:2181
hellotopic
Отправить сообщение в тему и проверить то же самое на консоли потребителя
kafka-console-producer.bat --broker-list localhost:9092 --topic hellotopic --property "parse.key=true" --property "key.separator=:"
Ключ сообщения и значение, введенные как показано ниже
key1:value1
Вы можете видеть, что на потребителе консоли мы можем видеть сообщение в теме 'hellotopic'
kafka-console-consumer.bat --zookeeper localhost:2181 --topic hellotopic --from-beginning
Вывод команды выше приведен ниже.Мы можем видеть значение сообщения 'value1', которое было отправлено
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
value1
Теперь, когда у нас есть тема с сообщением, я запускаю свой простой потребительский код Java kafka, чтобы получить всесообщения в теме 'hellotopic'.Ниже приведен код
import java.util.Arrays\;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class SampleConsumer {
public static void main(String[] args) {
System.out.println("Start consumer code");
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-consumer-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("hellotopic"));
//while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
//}
System.out.println("End consumer code");
}
}
Когда мы запустим вышеприведенный класс, мы увидим вывод
Start consumer code
End consumer code
Много пытался найти проблему, но пока не повезло.Большое спасибо за помощь в этом простом примере.