Создание фрейма данных в классе прототипа с полем bcl.DateTime не выбрасывает ничего, это не исключение термина - PullRequest
0 голосов
/ 10 июля 2020

У меня есть класс case, созданный из файла .proto через scalapb, в котором есть несколько полей с типом bcl.DateTime. Определение класса case выглядит следующим образом:

@SerialVersionUID(0L)
final case class EditorialAdEntity(
    customerid: _root_.scala.Int = 0,
    accountid: _root_.scala.Int = 0,
    orderId: _root_.scala.Long = 0L,
    entityId: _root_.scala.Long = 0L,
    dataFeedId: _root_.scala.Long = 0L,
    editorialStatusModifiedDTim: _root_.scala.Option[bcl.bcl.DateTime] = _root_.scala.None,
    modifiedDTim: _root_.scala.Option[bcl.bcl.DateTime] = _root_.scala.None,
    adTitle: _root_.scala.Predef.String = "",
    adDescription: _root_.scala.Predef.String = "",
    adDescription2: _root_.scala.Predef.String = "",
    displayURL: _root_.scala.Predef.String = "",
    businessName: _root_.scala.Predef.String = "",
...

Я могу создать экземпляр этого класса case и просмотреть его содержимое следующим образом:

val currentDt: DateTime = DateTime.of(value = Some(DateTimeUtils.getCurrentMillis), kind = Some(DateTimeKind.UTC), scale = Some(TimeSpanScale.MILLISECONDS))
val entity: EditorialAdEntity = EditorialAdEntity(customerid = customerId, accountid = accountId, adTitle = "test",
          orderId = orderId, serviceLevelId = 5, campaignType = campaignType,
          createdDtim = Some(currentDt), modifiedDTim = Some(currentDt),
          editorialStatusModifiedDTim = Some(currentDt) )
        
Logger.logInfo(entity.toProtoString)

Однако, когда я создаю Spark dataframe поверх этого, как показано ниже:

val data = spark.sqlContext.createDataFrame(List(entity))
data.show()

Я получаю следующую ошибку:

Exception in thread "main" scala.ScalaReflectionException: <none> is not a term
    at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
    at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(Symbols.scala:84)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.constructParams(ScalaReflection.scala:985)
    at org.apache.spark.sql.catalyst.ScalaReflection$.constructParams(ScalaReflection.scala:46)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.getConstructorParameters(ScalaReflection.scala:965)
    at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructorParameters(ScalaReflection.scala:46)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:782)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:724)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:723)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:737)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:724)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:723)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1$$anonfun$apply$8.apply(ScalaReflection.scala:785)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1$$anonfun$apply$8.apply(ScalaReflection.scala:784)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:784)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:724)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:723)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:737)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:724)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:723)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1$$anonfun$apply$8.apply(ScalaReflection.scala:785)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1$$anonfun$apply$8.apply(ScalaReflection.scala:784)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:784)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:724)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:723)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:720)
    at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:313)
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:285)
    at Scripts.SampleScripts.Protobuf.demo.EnforcementProtoTester$.main(EnforcementProtoTester.scala:43)
    at Scripts.SampleScripts.Protobuf.demo.EnforcementProtoTester.main(EnforcementProtoTester.scala)

Если я удалю поля DateTime из прото-класса, все будет нормально. Есть какие-нибудь указатели на то, как создавать фреймы данных поверх прототипов с полями bcl.DateTime?

1 Ответ

0 голосов
/ 11 июля 2020

Чтобы использовать сгенерированный ScalaPB класс со Spark, вам необходимо добавить зависимость библиотеки от sparksql-scalapb и использовать ProtoSQL.createDataFrame() вместо spark.sqlContext.createDataFrame. Процесс описан здесь: https://scalapb.github.io/sparksql.html#using -spark sql -scalapb

...