Naive Kafka Producer не работает, когда я пытаюсь создать сообщения, которые читаются из файла - PullRequest
0 голосов
/ 08 июня 2018

Я пытаюсь написать наивного продюсера Кафки, используя Java.Приложение принимает два ввода:

  1. Имя темы Kafka, для которой должны создаваться сообщения
  2. Путь к файлу, содержащему сообщения, которые должны быть отправлены в Kafka

Я написал следующий код.Когда я запускаю его, я вижу, что операторы System.out.println печатают ожидаемые значения, но сообщения почему-то не передаются в Kafka.Что я должен изменить, чтобы заставить его работать?

package com.myname.kafka.producer;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class NaiveKafkaProducer {

    private static final Properties properties = new Properties();

    private static Producer<String, String> producer;

    private static String topic;

    private static BufferedReader br;

    static {
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("request.required.acks", "all");
        System.out.println("Creating Kafka producer with the following properties :: " + properties);
        producer = new KafkaProducer<>(properties);
    }

    @SuppressWarnings("resource")
    public static void main(String[] args) throws IOException {
        try {
            if(args.length != 0) {
                topic = args[0];
                File file = new File(args[1]);
                br = new BufferedReader((Reader) new FileReader(file));
            }
        } catch (Exception e) {
            System.out.println("Check input arguments. Error thrown while populating arguments to local variables");
            e.printStackTrace();
        }

        String msg;
        while ((msg = br.readLine()) != null) {
            System.out.println("Message to publish : " + msg);
            System.out.println("Topic : " + topic);
            producer.send(new ProducerRecord<String, String>(topic, "", msg));
        }
        return;
    }
}

Удивительно, но работает следующий код (в котором я все жестко запрограммировал):

package com.myname.kafka.producer;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class NaiveKafkaProducer {

    private static final Properties properties = new Properties();

    private static Producer<String, String> producer;

    private static String topic;

    private static BufferedReader br;

    static {
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("request.required.acks", "all");
        System.out.println("Creating Kafka producer with the following properties :: " + properties);
        producer = new KafkaProducer<>(properties);
    }

    public static void main(String[] args) throws IOException {
            try {
                String[] msgs = new String[2];
                msgs[0] = "message 1";
                msgs[1] = "message 2";
                topic = "mytopic"
                for(String msg:msgs){
                    producer.send(new ProducerRecord<String, String>(topic, "", msg));
                }
                producer.close();
            } catch (Exception e) {
                System.out.println("Exception caught in main method while trying to produce the messages to Kafka");
                e.printStackTrace();
            }
    }
}

1 Ответ

0 голосов
/ 08 июня 2018

есть критический метод, вызванный во втором фрагменте и отсутствующий в первом

producer.close();

из документа для этого метода:

Закрыть эторежиссер.Этот метод блокируется до тех пор, пока все ранее отправленные запросы не будут завершены.

Когда вы вызываете метод produce, в действительности это не означает, что сообщение было создано.Метод возвращает вас future.Вы можете подождать, пока каждое сообщение будет создано, вызывая get() для каждого результата метода продукта.

...