Считайте файл json как JSonObject вместо строки, а затем отправьте его в тему Kafka.Я использую библиотеку gson для разбора (в качестве примера кода), но вы можете выбрать любую библиотеку разбора json на ваш выбор.
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.stream.JsonReader;
import java.io.FileReader;
public class Main {
static Gson gson = new Gson();
public static JsonObject readJSON(String filePath) throws Exception {
JsonReader reader = new JsonReader(new FileReader(filePath));
return gson.fromJson(reader, JsonObject.class);
}
public static void main(String[] args) throws IOException {
String topicName = "test";
Properties props = new Properties();
props.put("kafka.bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer < String, String > sampleProducer = new KafkaProducer < String, String > (props);
ProducerRecord < String, String > record = new ProducerRecord < String, String > (topicName, readJSON("data.json").toString());
sampleProducer.send(record);
sampleProducer.close();
}
}
Также, если вам просто нужно прочитать файл и отправить его в тему как есть, ине обрабатывать контент.Вы можете просто прочитать весь файл как String за один раз и отправить его, вместо потоковой потоковой передачи, это сохранит структуру данных json:
public static String readFileAsString(File file)
throws IOException {
InputStream fileInputStream = new FileInputStream(file);
byte[] buffer = new byte[fileInputStream.available()];
int length = fileInputStream.read(buffer);
fileInputStream.close();
return new String(buffer, 0, length);
}
ProducerRecord < String, String > record = new ProducerRecord < String, String > (topicName, readFileAsString(new File("data.json")));
ОБНОВЛЕНИЯ:
Чтобы передать данные файла json в качестве значения ключа в тему Kafka, вам все равно нужно проанализировать файл как объект json и затем передать его через свойства json.Пожалуйста, проверьте приведенный ниже пример кода, я анализирую файл json как объект Map с помощью Jacksons, а затем передаю его свойства по потоку для отправки в тему по одному.
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
//read json file as map object
private static Map<String, String> readJsonFileAsMap(File file) throws Exception{
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(file, new TypeReference<Map<String,String>>(){});
}
//stream data as key value pair
KafkaProducer<String,String> sampleProducer= new KafkaProducer<String,String>(props);
readJsonFileAsMap(file).forEach((k,v)->{
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test",k,v);
sampleProducer.send(record);
});
sampleProducer.close();
Если вы используете консольный потребитель для проверки данныхубедитесь, что print.key=true
, при желании вы также можете добавить разделитель key.separator=:
kafka-console-customer --bootstrap-server localhost: 9092 --topic test --from-start --property "print.key = true "-property" key.separator =: "