Я пытаюсь проверить код набора данных (набора данных) со строго типизированными вложенными классами случаев в массив данных, чтобы затем передать мои функции. Сериализация / создание фрейма данных продолжает терпеть неудачу, и у меня нет достаточного опыта, чтобы знать, что происходит в scala или spark.
Я думаю, что я пытаюсь определить схему, в то время как spark также определяет схему, и поскольку они не соответствуют ей, ошибки ??
Модель:
package io.swagger.client.model
import java.sql.Date
import scala.Enumeration
case class Member (
memberId: String,
memberIdSuffix: String,
memberSubscriberId: String,
memberEmpi: Option[Long] = None,
memberFirstName: String,
memberLastName: String,
memberMiddleInitial: Option[String] = None,
memberGender: String,
memberBirthDate: Date,
memberSocialSecurityNumber: Option[String] = None,
memeberPhoneNumbers: List[Telecom],
memberEmailAddresses: Option[List[Email]] = None,
memberAddresses: List[Address],
memberEligibilities: List[MemberEligibility]
)
case class Email (
address: String,
effectiveDate: Option[Date] = None,
terminationDate: Option[Date] = None,
isCurrent: Option[Boolean] = None,
isActive: Option[Boolean] = None
)
case class Address (
lineOne: String,
lineTwo: String,
cityName: String,
stateCode: String,
zipCode: String,
effectiveDate: Option[Date] = None,
terminationDate: Option[Date] = None,
isCurrent: Option[Boolean] = None,
isActive: Option[Boolean] = None
)
case class MemberEligibility (
productId: String,
productCategoryCode: String,
classId: String,
planId: String,
groupId: String,
maxCopayAmount: Option[Float] = None,
voidIndicator: Boolean,
healthplanEntryDate: Date,
memberStatusDescription: Option[String] = None,
eligibilityExplanation: Option[String] = None,
eligibilitySelectionLevelDescription: Option[String] = None,
eligibilityReason: Option[String] = None,
effectiveDate: Option[Date] = None,
terminationDate: Option[Date] = None,
isCurrent: Option[Boolean] = None,
isActive: Option[Boolean] = None
)
case class Telecom (
phoneNumber: String,
effectiveDate: Option[Date] = None,
terminationDate: Option[Date] = None,
isCurrent: Option[Boolean] = None,
isActive: Option[Boolean] = None,
telecomType: String
)
object Genders extends Enumeration {
val male, female, unknown, other = Value
}
object Gender extends Enumeration {
val home, work, fax = Value
}
Тестовый код:
import scala.util.{Try, Success, Failure}
import io.swagger.client.model._
import org.apache.spark.sql.{SparkSession, DataFrame, Dataset}
import org.apache.spark.SparkContext
import org.scalatest._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
trait SparkContextSetup {
def withSparkContext(testMethod: (SparkSession, SparkContext) => Any) {
val spark = org.apache.spark.sql.SparkSession.builder
.master("local")
.appName("Spark test")
.getOrCreate()
val sparkContext = spark.sparkContext
try {
testMethod(spark,sparkContext)
} finally sparkContext.stop()
}
}
class HelloSpec extends WordSpec with Matchers with SparkContextSetup {
"My analytics" should {
"calculate the right thing" in withSparkContext { (spark, sparkContext) =>
MockMemberData(spark)
}
}
private def MockMemberData(spark: SparkSession) = {
import spark.implicits._
import java.sql.{Date}
import java.text.SimpleDateFormat
import org.apache.spark.sql.types._
var testDate = Try(new SimpleDateFormat("dd/MM/yyyy").parse("01/01/2018"))
.map(d => new java.sql.Date(d.getTime()))
.get
val mockData = spark.sparkContext
.parallelize(
Seq(
Member(
memberId = "12345",
memberIdSuffix = "Mr.",
memberSubscriberId = "000000011",
memberEmpi = None,
memberFirstName = "firstname",
memberLastName = "lastname",
Some("w"),
Genders.male.toString,
testDate,
Some("123456789"),
List(
Telecom("12345678910", None, None, Some(true), Some(true), "")
),
Option(
List(
Email(
"test@gmail.com",
None,
Some(testDate),
isCurrent = Some(true),
isActive = Some(true)
)
)
),
List(
Address(
"10 Awesome Dr",
"",
"St. Louis",
"MO",
"63000",
None,
None,
None,
None
)
),
List(
MemberEligibility(
"productid",
"productCategoryCode",
"classId",
"planId",
"groupId",
None,
false,
testDate,
None,
None,
None,
None,
None,
None,
None
)
)
)
)
)
.toDF()
mockData.show()
}
}
Я ожидал получить схему информационного кадра (или набор данных в этом случае, что я и получил:
[info] HelloSpec:
[info] My analytics
[info] - should calculate the right thing *** FAILED ***
[info] org.apache.spark.sql.AnalysisException: cannot resolve 'wrapoption(staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue10, MapObjects_loopIsNull11, StructField(address,StringType,true), StructField(effectiveDate,DateType,true), StructField(terminationDate,DateType,true), StructField(isCurrent,BooleanType,true), StructField(isActive,BooleanType,true), if (isnull(lambdavariable(MapObjects_loopValue10, MapObjects_loopIsNull11, StructField(address,StringType,true), StructField(effectiveDate,DateType,true), StructField(terminationDate,DateType,true), StructField(isCurrent,BooleanType,true), StructField(isActive,BooleanType,true)))) null else newInstance(class io.swagger.client.model.Email), cast(memberEmailAddresses as array<struct<address:string,effectiveDate:date,terminationDate:date,isCurrent:boolean,isActive:boolean>>)).array, true), ObjectType(class scala.collection.immutable.List))' due to data type mismatch: argument 1 requires scala.collection.immutable.List type, however, 'staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue10, MapObjects_loopIsNull11, StructField(address,StringType,true), StructField(effectiveDate,DateType,true), StructField(terminationDate,DateType,true), StructField(isCurrent,BooleanType,true), StructField(isActive,BooleanType,true), if (isnull(lambdavariable(MapObjects_loopValue10, MapObjects_loopIsNull11, StructField(address,StringType,true), StructField(effectiveDate,DateType,true), StructField(terminationDate,DateType,true), StructField(isCurrent,BooleanType,true), StructField(isActive,BooleanType,true)))) null else newInstance(class io.swagger.client.model.Email), cast(memberEmailAddresses as array<struct<address:string,effectiveDate:date,terminationDate:date,isCurrent:boolean,isActive:boolean>>)).array, true)' is of scala.collection.Seq type.;
[info] at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
[info] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:82)
[info] at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)
[info] at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:310)
[info] at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:310)
[info] at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
[info] at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:309)
[info] at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307)
[info] at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307)
[info] at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5$$anonfun$apply$11.apply(TreeNode.scala:360)
[info] ...
ОБНОВЛЕНИЕ
так вместо
val mockData = spark.sparkContext
.parallelize(
Seq(
или
val mockData = spark.sparkContext
.parallelize(
List(
Использование массива работает?
val mockData = spark.sparkContext
.parallelize(
Array(
Почему Array работает, но Seq и List не работают?