Есть ли простой способ конвертировать JavaRDD <String>, содержащий JSON, в пользовательский объект Java - PullRequest
0 голосов
/ 17 июня 2019

У меня есть искровой потоковый контекст, получающий потоки данных от потребителя Kafka.Данные содержат объекты JSON.Мне нужно преобразовать это в пользовательский объект Java, чтобы я мог выполнить некоторую обработку.Есть ли простой способ сделать это?По сути, мне нужен способ преобразования JavaRDD в обычную строку, чтобы я мог использовать gson.fromJSON для преобразования его в мой простой объект класса POJO.

Я пробовал какой-то метод, но получаю проблемы с сериализацией

JavaDStream jds = stream.map (x -> x.value ());

    jds.foreachRDD(x -> System.out.println(x.count()));

    jds.foreachRDD(new VoidFunction<JavaRDD<String>>() {

        private static final long serialVersionUID = 1L;

        @Override
        public void call(JavaRDD<String> rdd) {
            rdd.foreach(a -> {
                TransactionData tr = gson.fromJson(a, TransactionData.class);
            }
            );
        }

TransactionData - это обычный класс Java-бина с двумя полями id и amount и их методом получения / установки

В приведенном выше коде я получаю сообщение об ошибке сериализации.Это ошибка: org.apache.spark.SparkException: задача не сериализуема. Причина: java.io.NotSerializableException: com.google.gson.Gson Стек сериализации: - объект не сериализуем (класс: com.google.gson.Gson,значение: {serializeNulls: falsefactories: [Factory [typeHierarchy = com.google.gson.JsonElement, adapter = com.google.gson.internal.bind.TypeAdapters $ 25 @ 35c645ea] ....

Любые идеи покак это решить?

1 Ответ

0 голосов
/ 17 июня 2019

Проблема здесь в том, что Gson не сериализуем, его можно исправить, избегая сериализации Gson и создавая экземпляр только во время обработки. Класс Wrapper для Gson может быть создан и использован в основном коде; Класс Car используется в примере вместо TransactionData:

public class CarConverter implements Serializable {
transient Gson gson;

private Gson getGson() {
    if (gson == null) {
        gson = new Gson();
    }
    return gson;
}

public JavaRDD<Car> convert(JavaRDD<String> rdd) {
    return rdd.map(a -> getGson().fromJson(a, Car.class));
}
}

Пример использования:

    List<String> data = Lists.newArrayList("{\"brand\":\"Jeep\", \"doors\": 3}", "{\"brand\":\"Slavuta\", \"doors\": 4}");
    JavaRDD<String> rdd = jsc().parallelize(data);
    CarConverter converter = new CarConverter();
    JavaRDD<Car> result = converter.convert(rdd);
...