Получение вложенного столбца в фрейме данных Python Spark - PullRequest
0 голосов
/ 09 апреля 2019

Часть моей схемы df:

-- result: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- error: string (nullable = true)
 |    |    |-- hop: long (nullable = true)
 |    |    |-- resuLt: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- from: string (nullable = true)
 |    |    |    |    |-- rtt: double (nullable = true)
 |    |    |    |    |-- size: long (nullable = true)
 |    |    |    |    |-- ttl: long (nullable = true)
 |    |    |-- result: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- Rtt: double (nullable = true)
 |    |    |    |    |-- Ttl: long (nullable = true)
 |    |    |    |    |-- dstoptsize: long (nullable = true)
 |    |    |    |    |-- dup: boolean (nullable = true)
 |    |    |    |    |-- edst: string (nullable = true)
 |    |    |    |    |-- err: string (nullable = true)
 |    |    |    |    |-- error: string (nullable = true)
 |    |    |    |    |-- flags: string (nullable = true)
 |    |    |    |    |-- from: string (nullable = true)
 |    |    |    |    |-- hdropts: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- mss: long (nullable = true)
 |    |    |    |    |-- icmpext: struct (nullable = true)
 |    |    |    |    |    |-- obj: array (nullable = true)
 |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |-- class: long (nullable = true)
 |    |    |    |    |    |    |    |-- mpls: array (nullable = true)
 |    |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |    |    |-- exp: long (nullable = true)
 |    |    |    |    |    |    |    |    |    |-- label: long (nullable = true)
 |    |    |    |    |    |    |    |    |    |-- s: long (nullable = true)
 |    |    |    |    |    |    |    |    |    |-- ttl: long (nullable = true)
 |    |    |    |    |    |    |    |-- type: long (nullable = true)
 |    |    |    |    |    |-- rfc4884: long (nullable = true)
 |    |    |    |    |    |-- version: long (nullable = true)
 |    |    |    |    |-- itos: long (nullable = true)
 |    |    |    |    |-- ittl: long (nullable = true)
 |    |    |    |    |-- late: long (nullable = true)
 |    |    |    |    |-- mtu: long (nullable = true)
 |    |    |    |    |-- rtt: double (nullable = true)
 |    |    |    |    |-- sIze: long (nullable = true)
 |    |    |    |    |-- size: long (nullable = true)
 |    |    |    |    |-- tos: long (nullable = true)
 |    |    |    |    |-- ttl: long (nullable = true)
 |    |    |    |    |-- x: string (nullable = true)

Как я могу запросить вложенный столбец, например, result.result.dstopsize?Я хотел бы иметь возможность отображать все, начиная с result или даже result.result или result.resuLt (чувствителен к регистру в моей конфигурации свечи)

Когда я пытаюсь:

file_df.select("result.resuLt.dstopsize").show(10)

Я получаю эту ошибку:

cannot resolve '`result`.`resuLt`['dstopsize']' due to data type mismatch: argument 2 requires integral type, however, ''dstopsize'' is of string type.;;

РЕДАКТИРОВАТЬ: вот некоторые примеры данных

|_corrupt_record| af|       dst_addr|       dst_name|   endtime|         from|  fw|group_id|lts|  msm_id|  msm_name|paris_id|prb_id|proto|              result|size|     src_addr| timestamp| ttr|      type|
+---------------+---+---------------+---------------+----------+-------------+----+--------+---+--------+----------+--------+------+-----+--------------------+----+-------------+----------+----+----------+
|           null|  4|213.133.109.134|213.133.109.134|1551658584|78.197.253.14|4940|    null| 71|    5019|Traceroute|       3| 13230|  UDP|[[, 1,, [[,,,,,,,...|  40|192.168.0.130|1551658577|null|traceroute|
|           null|  4|   37.143.33.15|   37.143.33.15|1551658584|78.197.253.14|4940|15254159| 71|15254159|Traceroute|      12| 13230| ICMP|[[, 1,, [[,,,,,,,...|  48|192.168.0.130|1551658583|null|traceroute|
|           null|  4|  139.162.27.28|  139.162.27.28|1551658612|78.197.253.14|4940|    null| 20|    5027|Traceroute|       3| 13230|  UDP|[[, 1,, [[,,,,,,,...|  40|192.168.0.130|1551658606|null|traceroute|
|           null|  4|    45.33.72.12|    45.33.72.12|1551658610|78.197.253.14|4940|    null| 18|    5029|Traceroute|       3| 13230|  UDP|[[, 1,, [[,,,,,,,...|  40|192.168.0.130|1551658608|null|traceroute|
|           null|  4|104.237.152.132|104.237.152.132|1551658615|78.197.253.14|4940|    null| 23|    5028|Traceroute|       3| 13230|  UDP|[[, 1,, [[,,,,,,,...|  40|192.168.0.130|1551658608|null|traceroute|
|           null|  4|  94.126.208.18|  94.126.208.18|1551658516|37.14.215.183|4940| 9183324| 20| 9183324|Traceroute|      15| 11958| ICMP|[[, 1,, [[,,,,,,,...|  48| 192.168.22.2|1551658439|null|traceroute|
|           null|  4|196.192.112.244|196.192.112.244|1551658554|37.14.215.183|4940| 9181461| 25| 9181461|Traceroute|      15| 11958| ICMP|[[, 1,, [[,,,,,,,...|  48| 192.168.22.2|1551658474|null|traceroute|
|           null|  4|    46.234.34.8|    46.234.34.8|1551658539|37.14.215.183|4940| 9180758| 10| 9180758|Traceroute|      15| 11958| ICMP|[[, 1,, [[,,,,,,,...|  48| 192.168.22.2|1551658479|null|traceroute|
|           null|  4|    185.2.64.76|    185.2.64.76|1551658560|37.14.215.183|4940| 9181290| 31| 9181290|Traceroute|      15| 11958| ICMP|[[, 1,, [[,,,,,,,...|  48| 192.168.22.2|1551658511|null|traceroute|
|           null|  4|  208.80.155.69|  208.80.155.69|1551658597|37.14.215.183|4940| 9183716|  8| 9183716|Traceroute|      15| 11958| ICMP|[[, 1,, [[,,,,,,,...|  48| 192.168.22.2|1551658546|null|traceroute|
+---------------+---+---------------+---------------+----------+-------------+----+--------+---+--------+----------+--------+------+-----+--------------------+----+-------------+----------+----+----------+```

Ответы [ 2 ]

0 голосов
/ 09 апреля 2019

Тип result относится к типу array, поэтому вам потребуется explode или explode_outer, а затем получить доступ ко всему, что вам нужно для доступа.

from pyspark.sql.functions import explode_outer, col
file_df.withColumn("exploded_result", explode_outer(col("result")))
       .select("exploded_result.resuLt.dstopsize").show(10)

Будьте осторожны, у вас будет несколько строк, соответствующих отдельной строке в зависимости от количества элементов.

0 голосов
/ 09 апреля 2019

Это ужасно уродливо, но это вернет правильный ответ, если вы знаете, какие индексы столбцов являются вашими вложенными полями.Это может быть вычислено на основе схемы.

case class C(x: String, dstoptsize: Long, y: String)
case class B(result: Array[C])
case class A(result: Array[B])

val df = List(
    A(Array(
      B(Array(C("x10", 10, "y10"), C("x11", 11, "y11"))),
      B(Array(C("x12", 12, "y12"), C("x13", 13, "y13")))
      )), 
    A(Array(B(Array(C("x20", 20, "y20"), C("x21", 21, "y21")))))).toDF


val selectInner = udf((x: Seq[Row]) => { x.map(_.getSeq[Row](0).map(_.getLong(1))) })

df.select(selectInner($"result")).show

+--------------------+
|         UDF(result)|
+--------------------+
|[[10, 11], [12, 13]]|
|          [[20, 21]]|
+--------------------+

...