Как отправить потоковые данные JSON в виде пары «ключ-значение» в потребителя kafka - PullRequest
0 голосов
/ 30 ноября 2018

я пишу код jave для чтения данных json из локальной файловой системы и хочу отправить эти данные в виде пары ключ-значение

public static void main(String[] args) throws IOException 
{
        Stream<String> objec = Files.lines(Paths.get("path\\data.json"));


                String topicName="test";

                Properties props=new Properties();
                props.put("kafka.bootstrap.servers", "localhost:9092,localhost:9093");
                props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


                KafkaProducer<String,String> sampleProducer= new KafkaProducer<String,String>(props);
                objec.forEach(f->{
                ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,f);        
                sampleProducer.send(record);
                });
                sampleProducer.close();

Но когда я запускаю эту программу, она отправляет данные в kafkaconsumerкак строка, как я могу отправить данные JSON в качестве пары значения ключа потребителю Kafka ...

здесь образец файла JSON

{  
   "wifi_result":"1",
   "mic_result":"1",
   "video_result":"1",
   "touch_result":"1",
   "proximity_result":"1",
   "vibrator_result":"1",
   "power_key":"2",
   "accelerometer":"0",
   "earphone":"1",
   "memory_result":"1",
   "memory_internalSD":"1",
   "memory_internalSDSize":"25.0GB",
   "memory_externalSD":"0",
   "memory_externalSDSize":"",
   "memory_internalflash":"1",
   "memory_internalflashSize":"2.0GB",
   "vol_key_down":"0",
   "menu_key":"1",
   "headset_result":"1",

}

Помощь будет признателен ... Спасибозаранее ...

1 Ответ

0 голосов
/ 30 ноября 2018

Считайте файл json как JSonObject вместо строки, а затем отправьте его в тему Kafka.Я использую библиотеку gson для разбора (в качестве примера кода), но вы можете выбрать любую библиотеку разбора json на ваш выбор.

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.stream.JsonReader;
import java.io.FileReader;

public class Main {

    static Gson gson = new Gson();

    public static JsonObject readJSON(String filePath) throws Exception {
     JsonReader reader = new JsonReader(new FileReader(filePath));
     return gson.fromJson(reader, JsonObject.class);
    }

    public static void main(String[] args) throws IOException {

     String topicName = "test";

     Properties props = new Properties();
     props.put("kafka.bootstrap.servers", "localhost:9092,localhost:9093");
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


     KafkaProducer < String, String > sampleProducer = new KafkaProducer < String, String > (props);
     ProducerRecord < String, String > record = new ProducerRecord < String, String > (topicName, readJSON("data.json").toString());
     sampleProducer.send(record);
     sampleProducer.close();
    }
}

Также, если вам просто нужно прочитать файл и отправить его в тему как есть, ине обрабатывать контент.Вы можете просто прочитать весь файл как String за один раз и отправить его, вместо потоковой потоковой передачи, это сохранит структуру данных json:

    public static String readFileAsString(File file)
    throws IOException {
     InputStream fileInputStream = new FileInputStream(file);
     byte[] buffer = new byte[fileInputStream.available()];
     int length = fileInputStream.read(buffer);
     fileInputStream.close();
     return new String(buffer, 0, length);
    }

    ProducerRecord < String, String > record = new ProducerRecord < String, String > (topicName, readFileAsString(new File("data.json")));

ОБНОВЛЕНИЯ:

Чтобы передать данные файла json в качестве значения ключа в тему Kafka, вам все равно нужно проанализировать файл как объект json и затем передать его через свойства json.Пожалуйста, проверьте приведенный ниже пример кода, я анализирую файл json как объект Map с помощью Jacksons, а затем передаю его свойства по потоку для отправки в тему по одному.

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

//read json file as map object
    private static Map<String, String> readJsonFileAsMap(File file) throws Exception{
        ObjectMapper mapper = new ObjectMapper();
        return mapper.readValue(file, new TypeReference<Map<String,String>>(){});
    }

//stream data as key value pair
        KafkaProducer<String,String> sampleProducer= new KafkaProducer<String,String>(props);
        readJsonFileAsMap(file).forEach((k,v)->{
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("test",k,v);
            sampleProducer.send(record);
        });
        sampleProducer.close();

Если вы используете консольный потребитель для проверки данныхубедитесь, что print.key=true, при желании вы также можете добавить разделитель key.separator=:

kafka-console-customer --bootstrap-server localhost: 9092 --topic test --from-start --property "print.key = true "-property" key.separator =: "

...