Конвертировать сложный вложенный Json в Spark Dataframe в JAVA - PullRequest
0 голосов
/ 21 мая 2018

Может ли кто-нибудь помочь с кодом Java для преобразования следующего JSON в Spark Dataframe ..

Примечание. Это не логика файла

: прослушайте тему кафки T1, прочитайте каждыйзаписать в СДР и применить дополнительную логику, преобразовать полученные данные в объект Json и записать их в другую тему T2 в Kafka.

Структура T2 приведена ниже.

JSON:

 [  
   {  
      "@tenant_id":"XYZ",
      "alarmUpdateTime":1526342400000,
      "alarm_id":"AB5C9123",
      "alarm_updates":[  
         {  
            "alarmField":"Severity",
            "new_value":"Minor",
            "old_value":"Major"
         },
         {  
            "alarmField":"state",
            "new_value":"UPDATE",
            "old_value":"NEW"
         }
      ],
      "aucID":"5af83",
      "inID":"INC15234567",
      "index":"test",
      "product":"test",
      "source":"ABS",
      "state":"NEW"
   }
]

Создано классов:

    ClassAlarm{

        String @tenant_id;
        String alarm_id;
        .
        .
        List <AlarmUpdate> update;
        Get and Setter functions for all variables
    }

AlarmUpdate{

    String alarmField;
    String oldVal;
    String NewVal;

    Get and Setter functions for all variables
} 

AppClass{


     void static main(){
             Alarm alarmObj = new Alarm();
          //set values for variables in alarmObj.
           Dataset <Row> results = jobCtx.getSparkSession().createDataFrame(Arrays.asList(alarmObj), Alarm.class)

           //At this point seeing following errors.

      }

}

Ошибка:

2018-05-15 13:40:48 ОШИБКА JobScheduler - Ошибка запуска задания потоковой передачи задания 1526406040000 мс.0 scalaОшибкакатализатор_2.11-2.2.0.jar: 2.2.0] в org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl (CatalystTypeConverters.scala: 231) ~ [spark-катализатор_2.11-2.2.0.jar: 2.2.0] at org.apache.spark.sql.catalyst. CatalystTypeConverters.scala: 170) ~ [spark-катализатор_2.11-2.2.0.jar: 2.2.0] в org.apache.spark.sql.catalyst.CatalystTypeConverters $ ArrayConverter.toCatalystImpl (CatalystTypeConverters.scala: 154) ~ [spark-catalyst_2.11-2.2.0.jar: 2.2.0] at org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst (CatalystTypeConverters.scala: 103) ~ [spark-катализатор_2.11-2.2.0.jar: 2.2.0] at org.apache.spark.sql.catalyst.CatalystTypeConverters $$ anonfun $ createToCatalystConverter $ 2.apply (CatalystTypeConverters.scala: 379) ~ [spark-катализатор_2.11-2.2.0.jar: 2.2.0]в org.apache.spark.sql.SQLContext $$ anonfun $ beansToRows $ 1 $$ anonfun $ apply $ 1.apply (SQLContext.scala: 1105) ~ [spark-sql_2.11-2.2.0.jar: 2.2.0] вorg.apache.spark.sql.SQLContext $$ anonfun $ beansToRows $ 1 $$ anonfun $ применять $ 1.apply (SQLContext.scala: 1105) ~ [spark-sql_2.11-2.2.0.jar: 2.2.0] в scala.collection.TraversableLike $$ anonfun $ map $ 1.apply (TraversableLike.scala: 234) ~ [jaf-sdk-2.4.0.jar :?] в scala.collection.TraversableLike $$ anonfun $ map $ 1.apply (TraversableLike.scala: 234) ~ [jaf-sdk-2.4.0.jar :?] в scala.collection.IndexedSeqOptimized $ class.foreach (IndexedSeqOptimized.scala: 33) ~ [jaf-sdk-2.4.0.jar :?] в scala.collection.mutable.ArrayOps $ ofRef.foreach (ArrayOps.scala: 186) ~ [jaf-sdk-2.4.0.jar :?] в scala.collection.TraversableLike $ class.map (TraversableLike.scala: 234) ~ [jaf-sdk-2.4.0.jar :?] в scala.collection.mutable.ArrayOps $ofRef.map (ArrayOps.scala: 186) ~ [jaf-sdk-2.4.0.jar :?] at org.apache.spark.sql.SQLContext $$ anonfun $ beansToRows $ 1.apply (SQLContext.scala: 1105) ~[spark-sql_2.11-2.2.0.jar: 2.2.0] at org.apache.spark.sql.SQLContext $$ anonfun $ beansToRows $ 1.apply (SQLContext.scala: 1103) ~ [spark-sql_2.11-2.2.0.jar: 2.2.0] на scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 409) ~ [jaf-sdk-2.4.0.jar :?] на scala.collection.Iterator $ class.toStream (Iterator.scala: 1322) ~ [jaf-sdk-2.4.0.jar :?] at scala.collection.AbstractIterator.toStream (Iterator.scala: 1336) ~ [jaf-sdk-2.4.0.jar :?] в scala.collection.TraversableOnce $ class.toSeq (TraversableOnce.scala: 298) ~ [jaf-sdk-2.4.0.jar :?] в scala.collection.AbstractIterator.toSeq (Iterator.scala: 1336) ~ [jaf-sdk-2.4.0.jar :?] at org.apache.spark.sql.SparkSession.createDataFrame (SparkSession.scala: 406) ~ [spark-sql_2.11-2.2.0.jar:2.2.0]в com.ca.alarmupdates.AlarmUpdates.lambda $ null $ 0 (AlarmUpdates.java:85) ~ [classes / :?] в java.util.Arrays $ ArrayList.forEach (Arrays.java:3880) ~ [?: 1.8.0_161] at com.ca.alarmupdates.AlarmUpdates.lambda $ main $ f87f782d $ 1 (AlarmUpdates.java:58) ~ [classes / :?] at org.apache.spark.streaming.api.java.JavaDStreamLike $$ anonfun $ foreachRD$ 1.apply (JavaDStreamLike.scala: 272) ~ [spark-streaming_2.11-2.2.0.jar: 2.2.0] в org.apache.spark.streaming.api.java.JavaDStreamLike $$ anonfun $ foreachRDD $ 1.apply(JavaDStreamLike.scala: 272) ~ [spark-streaming_2.11-2.2.0.jar: 2.2.0] в org.apache.spark.streaming.dstream.DStream $$ anonfun $ foreachRDD $ 1 $$ anonfun $ apply $ mcV$ sp $ 3.apply (DStream.scala: 628) ~ [spark-streaming_2.11-2.2.0.jar: 2.2.0] в org.apache.spark.streaming.dstream.DStream $$ anonfun $ foreachRDD $ 1 $$anonfun $ apply $ mcV $ sp $ 3.apply (DStream.scala: 628) ~ [spark-streaming_2.11-2.2.0.jar: 2.2.0] в org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun$ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply $ mcV $ sp (ForEachDStream.scala: 51) ~ [spark-streaming_2.11-2.2.0.jar: 2.2.0] at org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (ForEachDStream.scala: 51) ~[spark-streaming_2.11-2.2.0.jar: 2.2.0] at org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (ForEachDStream.scala:51) ~ [spark-streaming_2.11-2.2.0.jar: 2.2.0] в org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties (DStream.scala: 416) ~ [spark-streaming_2.11-2.2.0.jar: 2.2.0] at org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply $ mcV $ sp (ForEachDStream.scala: 50) ~ [spark-streaming_2.11-2.2.0.jar: 2.2.0] в org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply (ForEachDStream.scala: 50) ~ [spark-streaming_2.11-2.2.0.jar: 2.2.0] вorg.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply (ForEachDStream.scala: 50) ~ [spark-streaming_2.11-2.2.0.jar: 2.2.0] в scala.util.Try $.применить (Try.scala: 192) ~ [jaf-sdk-2.4.0.jar :?] в org.apache.spark.streaming.scheduler.Job.run (Job.scala: 39) ~ [spark-streaming_2.11-2.2.0.jar: 2.2.0] в org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply$ mcV $ sp (JobScheduler.scala: 257) ~ [spark-streaming_2.11-2.2.0.jar: 2.2.0] в org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply (JobScheduler.scala: 257) ~ [spark-streaming_2.11-2.2.0.jar: 2.2.0] в org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply (JobScheduler.scala: 257) ~ [spark-streaming_2.11-2.2.0.jar: 2.2.0] по адресу scala.util.DynamicVariable.withValue (DynamicVariable.scala: 58) ~ [jaf-sdk-2.4.0.jar:?] в org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler.run (JobScheduler.scala: 256) ~ [spark-streaming_2.11-2.2.0.jar: 2.2.0] в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) ~ [?: 1.8.0_161] в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) ~ [?: 1.8.0_161] в java.lang.Thread.run (Thread.java:748) ~ [?: 1.8.0_161]

1 Ответ

0 голосов
/ 22 мая 2018

Вы можете использовать wholeTextFiles, чтобы прочитать файл json, получить текст json и использовать его в json API SparkSession как

import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

static SparkSession spark = SparkSession.builder().master("local").appName("simple").getOrCreate();
static JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());

Dataset<Row> df = spark.read().json(sc.wholeTextFiles("path to json file").map(t -> t._2()));
df.show(false);

, и вы должны получить

+----------+---------------+--------+--------------------------------------------+-----+-----------+-----+-------+------+-----+
|@tenant_id|alarmUpdateTime|alarm_id|alarm_updates                               |aucID|inID       |index|product|source|state|
+----------+---------------+--------+--------------------------------------------+-----+-----------+-----+-------+------+-----+
|XYZ       |1526342400000  |AB5C9123|[[Severity,Minor,Major], [state,UPDATE,NEW]]|5af83|INC15234567|test |test   |ABS   |NEW  |
+----------+---------------+--------+--------------------------------------------+-----+-----------+-----+-------+------+-----+

Вы можете использовать master и appName по своему усмотрению

Обновлено

Вы прокомментировали, что

The way you do through file , can we do it with the object . I have to convert to Ingest the data to the other T2

Для этого предположим, что у вас есть запись, прочитанная из темы T1 как строковый объект как

    String t1Record = "[\n" +
            "  {\n" +
            "    \"@tenant_id\":\"XYZ\",\n" +
            "    \"alarmUpdateTime\":1526342400000,\n" +
            "    \"alarm_id\":\"AB5C9123\",\n" +
            "    \"alarm_updates\":[\n" +
            "      {\n" +
            "        \"alarmField\":\"Severity\",\n" +
            "        \"new_value\":\"Minor\",\n" +
            "        \"old_value\":\"Major\"\n" +
            "      },\n" +
            "      {\n" +
            "        \"alarmField\":\"state\",\n" +
            "        \"new_value\":\"UPDATE\",\n" +
            "        \"old_value\":\"NEW\"\n" +
            "      }\n" +
            "    ],\n" +
            "    \"aucID\":\"5af83\",\n" +
            "    \"inID\":\"INC15234567\",\n" +
            "    \"index\":\"test\",\n" +
            "    \"product\":\"test\",\n" +
            "    \"source\":\"ABS\",\n" +
            "    \"state\":\"NEW\"\n" +
            "  }\n" +
            "]";

ивы конвертируете его в RDD как

    JavaRDD<String> t1RecordRDD = sc.parallelize(Arrays.asList(t1Record));

Затем вы можете применить json API для преобразования в dataframe как

    Dataset<Row> df = spark.read().json(t1RecordRDD);

, что должно дать вам тот же результат, что ивыше

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...