У меня есть .properties по умолчанию для брокера, и я использую kafka 2.12_2.5, который я использую для запуска сервера kafka в eclipse.
public class KafkaServer {
public static void main(String[] args) {
Properties props = new Properties();
FileInputStream istream = null;
try {
istream = new FileInputStream("C:\\Kafka\\server\\eclipse\\log4j.properties");
props.load(istream);
istream.close();
PropertyConfigurator.configure(props);
}
catch (Exception e) {
}
String[] a = new String[1];
a[0] = "C:\\Kafka\\server\\eclipse\\server.properties";
Kafka.main(a);
}
}
Затем я успешно создаю topi c используя следующий код
public class Topic {
public static void main(String[] args) {
NewTopic topic = new NewTopic("Queue", 1, (short) 1);
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (Admin admin = Admin.create(props)) {
List<NewTopic> list = Arrays.asList(topic);
admin.createTopics(list);
}
}
}
Сообщение, которое я проверяю, создается ли Topi c
public class TopicMonitor {
public static void main(String[] args) {
int count = 0;
while (count < 1) {
try {
Thread.sleep(5000);
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
Admin admin = Admin.create(props);
DescribeTopicsResult dtr = admin.describeTopics(Arrays.asList("Queue"));
Map<String, TopicDescription> map = dtr.all().get();
map.forEach((a, b) -> {
System.out.println(a);
System.out.println(":name->");
System.out.println(b.name());
System.out.println(":internal->");
System.out.println(b.isInternal());
System.out.println(":partitions->");
b.partitions().forEach(c -> {
System.out.println(":InternalSyncReplicas");
System.out.println(c.isr());
System.out.println(":Leader->");
System.out.println(c.leader());
});
});
} catch (InterruptedException | ExecutionException e) {
}
++count;
}
}
}
После чего я пишу Producer Api, но он не отправляет никаких сообщений, хотя он упоминает, что в журналах есть идентификатор фиксации
public class Producer {
public static void main(String[] args) {
Properties props = new Properties();
FileInputStream istream = null;
try {
istream = new FileInputStream("C:\\Kafka\\server\\eclipse\\log4j.properties");
props.load(istream);
istream.close();
PropertyConfigurator.configure(props);
} catch (Exception e) {
}
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try {
org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 4; i++) {
Future<RecordMetadata> f = producer.send(
new ProducerRecord<String, String>("Queue", Integer.toString(i), Integer.toString(i)),
new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});
RecordMetadata rmd = f.get();
System.out.println(rmd.toString());
}
producer.close();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Done.");
}
}
LOGS: - [2020-07-13 23: 33: 23,818] ИНФОРМАЦИЯ Версия Kafka: 2.5.0 (org. apache .kafka. common.utils.AppInfoParser) [2020-07-13 23: 33: 23,818] INFO Kafka commitId: 66563e712b0b9f84 (org. apache .kafka.common.utils.AppInfoParser)
Кто-нибудь может помочь? Насколько я понимаю, это может быть связано со слушателями, но не уверен.