Spark - скопировать поле, используя функции df.schema.copy для другого кадра данных - PullRequest
0 голосов
/ 03 января 2019

Мне нужно создать схему, используя существующее поле df.

Рассмотрим пример этого кадра данных

scala> case class prd (a:Int, b:Int)
defined class prd

scala> val df = Seq((Array(prd(10,20),prd(15,30),prd(20,25)))).toDF("items")
df: org.apache.spark.sql.DataFrame = [items: array<struct<a:int,b:int>>]

scala> df.printSchema
root
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: integer (nullable = false)
 |    |    |-- b: integer (nullable = false)

Мне нужно еще одно поле "items_day1", похожее на "items" для df2.Прямо сейчас я делаю это, как показано ниже, это обходной путь

scala> val df2=df.select('items,'items.as("item_day1"))
df2: org.apache.spark.sql.DataFrame = [items: array<struct<a:int,b:int>>, item_day1: array<struct<a:int,b:int>>]

scala> df2.printSchema
root
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: integer (nullable = false)
 |    |    |-- b: integer (nullable = false)
 |-- item_day1: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: integer (nullable = false)
 |    |    |-- b: integer (nullable = false)


scala>

Но как получить это, используя функции df.schema.add () или df.schema.copy ()?.

EDIT1:

Я пытаюсь, как показано ниже

val (a,b) = (df.schema,df.schema) // works
a("items")  //works
b.add(a("items").as("items_day1")) //Error.. 

1 Ответ

0 голосов
/ 04 января 2019

Чтобы добавить новое поле в схему DataFrame (которая имеет StructType ) с той же структурой, но с другим именем верхнего уровня существующего поля, вы можете скопировать StructField с измененным членом StructField name, как показано ниже:

import org.apache.spark.sql.types._

case class prd (a:Int, b:Int)

val df = Seq((Array(prd(10,20), prd(15,30), prd(20,25)))).toDF("items")

val schema = df.schema
// schema: org.apache.spark.sql.types.StructType = StructType(
//   StructField(items, ArrayType(
//     StructType(StructField(a,IntegerType,false), StructField(b,IntegerType,false)
//   ), true), true)
// )

val newSchema = schema.find(_.name == "items") match {
  case Some(field) => schema.add(field.copy(name = "items_day1"))
  case None        => schema
}
// newSchema: org.apache.spark.sql.types.StructType = StructType(
//   StructField(items, ArrayType(
//     StructType(StructField(a,IntegerType,false), StructField(b,IntegerType,false)
//   ), true), true),
//   StructField(items_day1, ArrayType(
//     StructType(StructField(a,IntegerType,false), StructField(b,IntegerType,false)
//   ), true), true)
// )
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...