Я пытаюсь написать наивного продюсера Кафки, используя Java.Приложение принимает два ввода:
- Имя темы Kafka, для которой должны создаваться сообщения
- Путь к файлу, содержащему сообщения, которые должны быть отправлены в Kafka
Я написал следующий код.Когда я запускаю его, я вижу, что операторы System.out.println
печатают ожидаемые значения, но сообщения почему-то не передаются в Kafka.Что я должен изменить, чтобы заставить его работать?
package com.myname.kafka.producer;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class NaiveKafkaProducer {
private static final Properties properties = new Properties();
private static Producer<String, String> producer;
private static String topic;
private static BufferedReader br;
static {
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("request.required.acks", "all");
System.out.println("Creating Kafka producer with the following properties :: " + properties);
producer = new KafkaProducer<>(properties);
}
@SuppressWarnings("resource")
public static void main(String[] args) throws IOException {
try {
if(args.length != 0) {
topic = args[0];
File file = new File(args[1]);
br = new BufferedReader((Reader) new FileReader(file));
}
} catch (Exception e) {
System.out.println("Check input arguments. Error thrown while populating arguments to local variables");
e.printStackTrace();
}
String msg;
while ((msg = br.readLine()) != null) {
System.out.println("Message to publish : " + msg);
System.out.println("Topic : " + topic);
producer.send(new ProducerRecord<String, String>(topic, "", msg));
}
return;
}
}
Удивительно, но работает следующий код (в котором я все жестко запрограммировал):
package com.myname.kafka.producer;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class NaiveKafkaProducer {
private static final Properties properties = new Properties();
private static Producer<String, String> producer;
private static String topic;
private static BufferedReader br;
static {
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("request.required.acks", "all");
System.out.println("Creating Kafka producer with the following properties :: " + properties);
producer = new KafkaProducer<>(properties);
}
public static void main(String[] args) throws IOException {
try {
String[] msgs = new String[2];
msgs[0] = "message 1";
msgs[1] = "message 2";
topic = "mytopic"
for(String msg:msgs){
producer.send(new ProducerRecord<String, String>(topic, "", msg));
}
producer.close();
} catch (Exception e) {
System.out.println("Exception caught in main method while trying to produce the messages to Kafka");
e.printStackTrace();
}
}
}