Я запускаю следующий код:
import com.holdenkarau.spark.testing.DatasetSuiteBase
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.{Encoders, Row}
import org.scalatest.FlatSpec
case class Inner(i: Int)
case class Outer(in: Inner)
class MyTest extends FlatSpec with DatasetSuiteBase {
behavior of "Engine"
it should "work" in {
import spark.implicits._
val input = Seq("alice", "bob").toDF("name")
val schema = Encoders.product[Outer].schema
implicit val enc = Encoders.kryo[Row]
val processed = input
.map { row =>
new GenericRowWithSchema(Array(Outer(Inner(row.getString(0).length))), schema): Row
}
processed.printSchema() //1
processed.show //2
val withSchema = spark.createDataFrame(processed.rdd, schema)
withSchema.printSchema //3
withSchema.show // throws exception
}
}
Результат от 1
root
|-- value: binary (nullable = true)
Результат от 2
+--------------------+
| value|
+--------------------+
|[01 00 6F 72 67 2...|
|[01 00 6F 72 67 2...|
+--------------------+
Результат от 3
root
|-- in: struct (nullable = true)
| |-- i: integer (nullable = false)
4
выдает исключение
Outer is not a valid external type for schema of struct<i:int>
Кто-нибудь знает, что здесь не так? Это вообще возможно в Spark?
@ редактировать
Повторно реализовано
it should "find minimal example" in {
import spark.implicits._
val input = Seq("alice", "bob").toDF("name")
val schema = Encoders.product[Outer].schema
implicit val enc = RowEncoder(schema)
val processed = input.map { row => Row(Outer(Inner(row.getString(0).length))) }
processed.printSchema()
processed.show
}