Искровой отвал на паркет с колонной в виде массива конструкций - PullRequest
0 голосов
/ 03 марта 2020

Мне нужно загрузить CSV-файл, имеющий столбец с массивом структур, и выгрузить его в другое место в формате паркета. В моем файле csv есть два столбца, столбцы A и B. Тип данных столбца B: array<struct<x: bigint, y:bigint>>

. Я попытался загрузить файл csv со следующей схемой:

val schemaB = ArrayType(StructType(Seq(StructField("x",LongType),StructField("y",LongType))))
val schema = new StructType().add("A",StringType).add("B",schemaB)
spark.read.option("sep", "\t").schema(schema).csv(<location>)

Однако это не сработало. Я получил следующую ошибку:

org.apache.spark.sql.AnalysisException: CSV data source does not support array<struct<x:bigint,y:bigint>&gt; data type.;</struct<x:bigint,y:bigint>

Я даже пытался привести к требуемому типу, но это не сработало.

Это пример того, как выглядит столбец B:

|B                                                                                                                                                                                                                                                                                                                                                                                                                   |
+---------------------------------------------------------------------------------------------+
|68222:102332,21215:1000,10982:70330,|
|93302:13320,263721:902615,9382:100020,|

1 Ответ

1 голос
/ 03 марта 2020

Вы можете использовать функцию transform, если используете последнюю версию spark , т. Е. 2,4 +

Сначала читать как строку, split на ",", чтобы получить список и снова split на ":", чтобы получить x и y

val schema = new StructType().add("A",StringType).add("B",StringType)
val df = spark.read.option("delimiter", "\t").schema(schema).csv("path to csv")
val splitExpr =  expr("transform(split(B, ','), x -> (split(x, ':')[0] as x, split(x, ':')[1] as y))")

val result = df.select($"A", splitExpr.cast("array<struct<x: long, y:long>>") as "B" )

Теперь это можно сохранить в паркете. Если вы используете более старую версию spark, то вам нужно написать окончательную схему udf. :

root
 |-- A: string (nullable = true)
 |-- B: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- x: long (nullable = true)
 |    |    |-- y: long (nullable = true)
...