Spark -scala создает DataFrame из файла json и удаляет основные и вложенные элементы - PullRequest
0 голосов
/ 30 октября 2018

У меня есть файл json, который содержит объекты json, каждый объект за строкой. У меня есть следующая схема для этих объектов:

root
   |-- endtime: long (nullable = true)
   |-- result: array (nullable = true)
   |    |-- element: struct (containsNull = true)
   |    |    |-- hop: long (nullable = true)
   |    |    |-- result: array (nullable = true)
   |    |    |    |-- element: struct (containsNull = true)
   |    |    |    |    |-- from: string (nullable = true)
   |    |    |    |    |-- rtt: double (nullable = true)
   |    |    |    |    |-- size: long (nullable = true)
   |    |    |    |    |-- ttl: long (nullable = true)
   |    |    |    |    |-- x: string (nullable = true)

Вопрос: Как я могу создать новый DataFrame из Dataframe, содержащий данные в файле json, заданные в качестве входных данных, и удалить данные в виде ttl и x?

   |    |    |    |    |-- ttl: long (nullable = true)
   |    |    |    |    |-- x: string (nullable = true)

Учитывая, что я новичок в Spark (Scala), я не знаю, каковы возможные пути!

Было просто удалить конечное время:

val pathToTraceroutesExamples = getClass.getResource("/test/sample_1.json")
val df = spark.read.json(pathToTraceroutesExamples.getPath)

// Displays the content of the DataFrame to stdout
df.show()
df.printSchema()

var newDf = df.drop("endtime")

Ответы [ 2 ]

0 голосов
/ 31 октября 2018

Идея @Kris - это Истина; взорваться, а затем упасть. Я нашел пример здесь .

Я изменил результат имени атрибута, потому что у меня есть другое имя результата, чтобы избежать путаницы при разнесении:

Шаг 1: (вход)

 |-- timestamp: long (nullable = true)
 |-- hopDetails: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- hop: long (nullable = true)
 |    |    |-- result: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- from: string (nullable = true)
 |    |    |    |    |-- rtt: double (nullable = true)
 |    |    |    |    |-- size: long (nullable = true)
 |    |    |    |    |-- ttl: long (nullable = true)

Шаг 2: Код:

    var exploded_1 = renamed_newDF
             .withColumn("hop", explode(renamed_newDF("hopDetails.hop")))
             .withColumn("result", explode(renamed_newDF("hopDetails.result")))
             .drop("hopDetails")
    exploded_1.printSchema

Схема вывода:

 |-- timestamp: long (nullable = true)
 |-- hop: long (nullable = true)
 |-- result: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- from: string (nullable = true)
 |    |    |-- rtt: double (nullable = true)
 |    |    |-- size: long (nullable = true)
 |    |    |-- ttl: long (nullable = true)

Шаг 3:

Код:

var exploded_2 = exploded_1
  .withColumn("from", explode(exploded_1("result.from")))
  .withColumn("rtt", explode(exploded_1("result.rtt")))
  .withColumn("size", explode(exploded_1("result.size")))
  .withColumn("ttl", explode(exploded_1("result.ttl")))
  .drop("result")

exploded_2.printSchema

Схема:

    root
   |-- af: long (nullable = true)
   |-- dst_addr: string (nullable = true)
   |-- from: string (nullable = true)
   |-- msm_id: long (nullable = true)
   |-- prb_id: long (nullable = true)
   |-- src_addr: string (nullable = true)
   |-- timestamp: long (nullable = true)
   |-- hop: long (nullable = true)
   |-- rtt: double (nullable = true)
   |-- size: long (nullable = true)
   |-- ttl: long (nullable = true)
0 голосов
/ 30 октября 2018

explode и drop сделают свое дело. Сначала explode результат первого уровня, а затем explode результат второго уровня из результирующего кадра данных. Наконец drop столбцы.

Например,

val newDF = df
  .select(df(“*”), explode(df(“result”)).alias(“result_exp”))
  .drop(“ttl”).drop(“x”)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...