Разбор JSON-данных с использованием Apache Kafka Streaming - PullRequest
0 голосов
/ 25 мая 2018

У меня был сценарий для чтения данных JSON из моей темы Kafka, и, используя версию Kafka 0.11, мне нужно написать код Java для потоковой передачи данных JSON, присутствующих в теме Kafka. Мой ввод - это данные Json, содержащие массивысловарей.

Теперь мое требование - получить поле «текст», ввести словарь, содержащийся в массиве, из данных json и передать все эти текстовые твиты в другую тему через Kafka Streaming.

Я написал код до здесь.Пожалуйста, помогите мне разобрать данные.

Java-код для потоковой передачи

final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

KStreamBuilder builder = new KStreamBuilder();

KStream<String, JsonNode> personstwitter =builder.stream(Serdes.String(), jsonSerde, "Persons");//taking the json node as input


personstwitter.to(Serdes.String(), jsonSerde,"Persons-output");

1 Ответ

0 голосов
/ 25 мая 2018

Я хотел бы предложить вам следующее, чтобы лучше контролировать данные JSON.

  1. записать Serializer и De-Serializer.
  2. Создание POJO на основе строки JSON.POJO - лучший способ лучше контролировать данные.
  3. Сопоставьте данные с POJO для доступа к необходимым данным.

POJO:

@JsonRootName("person")
public class Person implements Serializable {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    private String name;
    private String personalID;
    private String country;
    private String occupation;

    public Person() {

    }

    @JsonCreator
    public Person(@JsonProperty("name") String name,@JsonProperty("personalID") String personalID,
            @JsonProperty("country") String country,@JsonProperty("occupation") String occupation){
        this.name= name;
        this.personalID = personalID;
        this.country = country;
        this.occupation = occupation;
    }

    //getters and setters stripped
}

Сериализатор:

public class JsonSerializer<T> implements Serializer<T> {

    private ObjectMapper om = new ObjectMapper();

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        // TODO Auto-generated method stub

    }

    @Override
    public byte[] serialize(String topic, T data) {
        byte[] retval = null;
        try {
            System.out.println(data.getClass());
            retval = om.writeValueAsString(data).getBytes();
        } catch (JsonProcessingException e) {
            throw new SerializationException();
        }
        return retval;
    }

}

Десериализатор:

public class JsonDeserializer<T> implements Deserializer<T> {

    private ObjectMapper om = new ObjectMapper();
    private Class<T> type;

    /*
     * Default constructor needed by kafka
     */
    public JsonDeserializer() {

    }

    public JsonDeserializer(Class<T> type) {
        this.type = type;
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

    @SuppressWarnings("unchecked")
    @Override
    public void configure(Map<String, ?> map, boolean arg1) {
        if (type == null) {
            type = (Class<T>) map.get("type");
        }

    }

    @Override
    public T deserialize(String undefined, byte[] bytes) {
        T data = null;
        if (bytes == null || bytes.length == 0) {
            return null;
        }

        try {
            System.out.println(getType());
            data = om.readValue(bytes, type);
        } catch (Exception e) {
            throw new SerializationException(e);
        }

        return data;
    }

    protected Class<T> getType() {
        return type;
    }

}

Потребитель:

public class ConsumerUtilities {

    public static Properties getProperties() {

        Properties configs = new Properties();
        configs.put(StreamsConfig.APPLICATION_ID_CONFIG,
                "Kafka test application");
        configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        return configs;
    }

    public static KStreamBuilder getStreamingConsumer() {
        KStreamBuilder builder = new KStreamBuilder();
        return builder;
    }

    public static void getStreamData() {
        JsonSerializer<Person> personJsonSerializer = new JsonSerializer<>();
        JsonDeserializer<Person> personJsonDeserializer = new JsonDeserializer<>(
                Person.class);
        Serde<Person> personSerde = Serdes.serdeFrom(personJsonSerializer,
                personJsonDeserializer);
        KStreamBuilder builder = getStreamingConsumer();

        try {

            KStream<String, Person> kStream = builder.stream(Serdes.String(),
                    personSerde, "test");
            kStream.foreach(new ForeachAction<String, Person>() {

                @Override
                public void apply(String arg0, Person arg1) {
                    System.out.println(arg1.getCountry());                  
                }

            });
        } catch (Exception s) {
            s.printStackTrace();
        }
        KafkaStreams kafkaStreams = new KafkaStreams(builder, getProperties());
        kafkaStreams.start();
    }

}

Производитель:

public class ProducerUtilities {

    public static org.apache.kafka.clients.producer.Producer<String, Person> getProducer() {
        Properties configProperties = new Properties();
        configProperties.put(ProducerConfig.CLIENT_ID_CONFIG,
                "kafka json producer");
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "localhost:9092");
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.ByteArraySerializer");
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "com.kafka.api.serdes.JsonSerializer");

        org.apache.kafka.clients.producer.Producer<String, Person> producer = new KafkaProducer<String, Person>(
                configProperties);
        return producer;
    }

    public ProducerRecord<String, Person> createRecord(Person person) {
        ProducerRecord<String, Person> record = new ProducerRecord<String, Person>(
                "test", person);
        return record;
    }

}
...