Разбор JSON сообщения в Spark из потока Кафка - PullRequest
0 голосов
/ 06 января 2020

У меня есть поток событий (формат ниже), который мне нужно проанализировать в spark (java). Я могу прочитать поток, но не смог найти пример для преобразования сообщения в java bean.

{
    user_id  : string,
    session_id : string,
    event : string,
    page : string,
    timestamp : timestamp
}

Java Bean

public class Event implements Serializable {

    private String user_id;

    private String session_id;
    private String page;
    private String event;
    private Timestamp timestamp;
}

Код читать сообщение как String.

Dataset<String> lines = spark
                        .readStream()
                        .format("kafka")
                        .option("kafka.bootstrap.servers", "localhost:9092")
                        .option("subscribe", topics)
                        .load()
                        .selectExpr("CAST(value AS STRING)")
                        .as(Encoders.STRING());     

1 Ответ

0 голосов
/ 06 января 2020

Мне удалось заставить это работать, используя следующий подход.

        FlatMapFunction<String, Event> linesToEvents = new FlatMapFunction<String, Event>() {
            @Override
            public Iterator<Event> call(String line) throws JsonMappingException, JsonProcessingException {
                ObjectMapper mapper = new ObjectMapper();
                ArrayList<Event> eventList = new ArrayList<>();
                eventList.add(mapper.readValue(line, Event.class));
                return eventList.iterator();
            }
        };

        Dataset<Event> lines = spark
                                .readStream()
                                .format("kafka")
                                .option("kafka.bootstrap.servers", "localhost:9092")
                                .option("subscribe", topics)
                                .load()
                                .selectExpr("CAST(value AS STRING)")
                                .as(Encoders.STRING())
                                .flatMap(linesToEvents, Encoders.bean(Event.class));
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...