создать пустой массив-столбец данной схемы в Spark - PullRequest
0 голосов
/ 27 июня 2018

Из-за того, что паркет не может парсировать пустые массивы, я заменил пустые массивы на ноль, прежде чем писать таблицу. Теперь, когда я читаю таблицу, я хочу сделать наоборот:

У меня есть DataFrame со следующей схемой:

|-- id: long (nullable = false)
 |-- arr: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- x: double (nullable = true)
 |    |    |-- y: double (nullable = true)

и следующий контент:

+---+-----------+
| id|        arr|
+---+-----------+
|  1|[[1.0,2.0]]|
|  2|       null|
+---+-----------+

Я бы хотел заменить пустой массив (id = 2) пустым массивом, т.е.

+---+-----------+
| id|        arr|
+---+-----------+
|  1|[[1.0,2.0]]|
|  2|         []|
+---+-----------+

Я пробовал:

val arrSchema = df.schema(1).dataType

df
.withColumn("arr",when($"arr".isNull,array().cast(arrSchema)).otherwise($"arr"))
.show()

, что дает:

java.lang.ClassCastException: org.apache.spark.sql.types.NullType $ не может быть приведен к org.apache.spark.sql.types.StructType

Edit: я не хочу "жестко кодировать" любую схему моего столбца массива (по крайней мере, не схему структуры), потому что это может варьироваться от случая к случаю. Я могу использовать только информацию схемы из df во время выполнения

Кстати, я использую Spark 2.1, поэтому не могу использовать typedLit

Ответы [ 4 ]

0 голосов
/ 02 июля 2018

UDF с классом case также может быть интересным:

case class Item(x: Double, y: Double)
val udf_emptyArr = udf(() => Seq[Item]())
df
.withColumn("arr",coalesce($"arr",udf_emptyArr()))
.show()
0 голосов
/ 27 июня 2018

Одним из способов является использование UDF:

val arrSchema = df.schema(1).dataType // ArrayType(StructType(StructField(x,DoubleType,true), StructField(y,DoubleType,true)),true)

val emptyArr = udf(() => Seq.empty[Any],arrSchema)

df
.withColumn("arr",when($"arr".isNull,emptyArr()).otherwise($"arr"))
.show()

+---+-----------+
| id|        arr|
+---+-----------+
|  1|[[1.0,2.0]]|
|  2|         []|
+---+-----------+
0 голосов
/ 28 июня 2018

Другой подход заключается в использовании coalesce:

val df = Seq(
  (Some(1), Some(Array((1.0, 2.0)))),
  (Some(2), None)
).toDF("id", "arr")

df.withColumn("arr", coalesce($"arr", typedLit(Array.empty[(Double, Double)]))).
  show
// +---+-----------+
// | id|        arr|
// +---+-----------+
// |  1|[[1.0,2.0]]|
// |  2|         []|
// +---+-----------+
0 голосов
/ 27 июня 2018
  • Spark 2.2+ с известным внешним типом

    Как правило, вы можете использовать typedLit для предоставления пустых массивов.

    import org.apache.spark.sql.functions.typedLit
    
    typedLit(Seq.empty[(Double, Double)])
    

    Чтобы использовать определенные имена для вложенных объектов, вы можете использовать case-классы:

    case class Item(x: Double, y: Double)
    
    typedLit(Seq.empty[Item])
    

    или переименование по касту :

    typedLit(Seq.empty[(Double, Double)])
      .cast("array<struct<x: Double, y: Double>>")
    
  • Spark 2.1+ только со схемой

    Только со схемой вы можете попробовать:

    val schema = StructType(Seq(
      StructField("arr", StructType(Seq(
        StructField("x", DoubleType),
        StructField("y", DoubleType)
      )))
    ))
    
    def arrayOfSchema(schema: StructType) =
      from_json(lit("""{"arr": []}"""), schema)("arr")
    
    arrayOfSchema(schema).alias("arr")
    

    , где schema можно извлечь из существующего DataFrame и обернуть дополнительным StructType:

    StructType(Seq(
      StructField("arr", df.schema("arr").dataType)
    ))
    
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...