API-интерфейс производителя Kafka не отправляет сообщения - PullRequest
0 голосов
/ 13 июля 2020

У меня есть .properties по умолчанию для брокера, и я использую kafka 2.12_2.5, который я использую для запуска сервера kafka в eclipse.

public class KafkaServer {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        FileInputStream istream = null;
        try {
          istream = new FileInputStream("C:\\Kafka\\server\\eclipse\\log4j.properties");
          props.load(istream);
          istream.close();
          PropertyConfigurator.configure(props);
        }
        catch (Exception e) {
        }
        String[] a = new String[1];
        a[0] = "C:\\Kafka\\server\\eclipse\\server.properties";
        Kafka.main(a);
    }

}

Затем я успешно создаю topi c используя следующий код

public class Topic {

    public static void main(String[] args) {
        NewTopic topic = new NewTopic("Queue", 1, (short) 1);
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        try (Admin admin = Admin.create(props)) {
            List<NewTopic> list = Arrays.asList(topic);
            admin.createTopics(list);
        }
    }

}

Сообщение, которое я проверяю, создается ли Topi c

public class TopicMonitor {

    public static void main(String[] args) {
        int count = 0;
        while (count < 1) {
            try {
                Thread.sleep(5000);
                Properties props = new Properties();
                props.put("bootstrap.servers", "localhost:9092");
                Admin admin = Admin.create(props);
                DescribeTopicsResult dtr = admin.describeTopics(Arrays.asList("Queue"));
                Map<String, TopicDescription> map = dtr.all().get();
                map.forEach((a, b) -> {
                    System.out.println(a);
                    System.out.println(":name->");
                    System.out.println(b.name());
                    System.out.println(":internal->");
                    System.out.println(b.isInternal());
                    System.out.println(":partitions->");
                    b.partitions().forEach(c -> {
                        System.out.println(":InternalSyncReplicas");
                        System.out.println(c.isr());
                        System.out.println(":Leader->");
                        System.out.println(c.leader());
                    });
                });
            } catch (InterruptedException | ExecutionException e) {

            }
            ++count;
        }
    }

}

После чего я пишу Producer Api, но он не отправляет никаких сообщений, хотя он упоминает, что в журналах есть идентификатор фиксации

public class Producer {

    public static void main(String[] args) {
        Properties props = new Properties();
        FileInputStream istream = null;
        try {
            istream = new FileInputStream("C:\\Kafka\\server\\eclipse\\log4j.properties");
            props.load(istream);
            istream.close();
            PropertyConfigurator.configure(props);
        } catch (Exception e) {
        }
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", "1");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        try {
            org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 4; i++) {
                Future<RecordMetadata> f = producer.send(
                        new ProducerRecord<String, String>("Queue", Integer.toString(i), Integer.toString(i)),
                        new Callback() {
                            @Override
                            public void onCompletion(RecordMetadata metadata, Exception e) {
                                if (e != null) {
                                    e.printStackTrace();
                                } else {
                                    System.out.println("The offset of the record we just sent is: " + metadata.offset());
                                }
                            }
                        });
                RecordMetadata rmd = f.get();
                System.out.println(rmd.toString());
            }
            producer.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println("Done.");
    }

}

LOGS: - [2020-07-13 23: 33: 23,818] ИНФОРМАЦИЯ Версия Kafka: 2.5.0 (org. apache .kafka. common.utils.AppInfoParser) [2020-07-13 23: 33: 23,818] INFO Kafka commitId: 66563e712b0b9f84 (org. apache .kafka.common.utils.AppInfoParser)

Кто-нибудь может помочь? Насколько я понимаю, это может быть связано со слушателями, но не уверен.

...