Ошибка набора данных Scala Spark на вложенном объекте - PullRequest
1 голос
/ 20 апреля 2019

Я пытаюсь проверить код набора данных (набора данных) со строго типизированными вложенными классами случаев в массив данных, чтобы затем передать мои функции. Сериализация / создание фрейма данных продолжает терпеть неудачу, и у меня нет достаточного опыта, чтобы знать, что происходит в scala или spark.

Я думаю, что я пытаюсь определить схему, в то время как spark также определяет схему, и поскольку они не соответствуют ей, ошибки ??

Модель:


package io.swagger.client.model

import java.sql.Date
import scala.Enumeration

case class Member (
  memberId: String,
  memberIdSuffix: String,
  memberSubscriberId: String,
  memberEmpi: Option[Long] = None,
  memberFirstName: String,
  memberLastName: String,
  memberMiddleInitial: Option[String] = None,
  memberGender: String,
  memberBirthDate: Date,
  memberSocialSecurityNumber: Option[String] = None,
  memeberPhoneNumbers: List[Telecom],
  memberEmailAddresses: Option[List[Email]] = None,
  memberAddresses: List[Address],
  memberEligibilities: List[MemberEligibility]
)
case class Email (
  address: String,
  effectiveDate: Option[Date] = None,
  terminationDate: Option[Date] = None,
  isCurrent: Option[Boolean] = None,
  isActive: Option[Boolean] = None
)
case class Address (
  lineOne: String,
  lineTwo: String,
  cityName: String,
  stateCode: String,
  zipCode: String,
  effectiveDate: Option[Date] = None,
  terminationDate: Option[Date] = None,
  isCurrent: Option[Boolean] = None,
  isActive: Option[Boolean] = None
)
case class MemberEligibility (
  productId: String,
  productCategoryCode: String,
  classId: String,
  planId: String,
  groupId: String,
  maxCopayAmount: Option[Float] = None,
  voidIndicator: Boolean,
  healthplanEntryDate: Date,
  memberStatusDescription: Option[String] = None,
  eligibilityExplanation: Option[String] = None,
  eligibilitySelectionLevelDescription: Option[String] = None,
  eligibilityReason: Option[String] = None,
  effectiveDate: Option[Date] = None,
  terminationDate: Option[Date] = None,
  isCurrent: Option[Boolean] = None,
  isActive: Option[Boolean] = None
)
case class Telecom (
  phoneNumber: String,
  effectiveDate: Option[Date] = None,
  terminationDate: Option[Date] = None,
  isCurrent: Option[Boolean] = None,
  isActive: Option[Boolean] = None,
  telecomType: String
)


object Genders extends Enumeration {
    val male, female, unknown, other = Value
}
object Gender extends Enumeration  {
    val home, work, fax = Value 
}

Тестовый код:



import scala.util.{Try, Success, Failure}
import io.swagger.client.model._
import org.apache.spark.sql.{SparkSession, DataFrame, Dataset}
import org.apache.spark.SparkContext
import org.scalatest._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession

trait SparkContextSetup {
  def withSparkContext(testMethod: (SparkSession, SparkContext) => Any) {

    val spark = org.apache.spark.sql.SparkSession.builder
            .master("local")
            .appName("Spark test")
            .getOrCreate()
    val sparkContext = spark.sparkContext
    try {
      testMethod(spark,sparkContext)
    } finally sparkContext.stop()
  }
}


class HelloSpec extends WordSpec with Matchers with SparkContextSetup {

  "My analytics" should {
    "calculate the right thing" in withSparkContext { (spark, sparkContext) =>
      MockMemberData(spark)
    }
  }

  private def MockMemberData(spark: SparkSession) = {
    import spark.implicits._
    import java.sql.{Date}
    import java.text.SimpleDateFormat
    import org.apache.spark.sql.types._

    var testDate = Try(new SimpleDateFormat("dd/MM/yyyy").parse("01/01/2018"))
      .map(d => new java.sql.Date(d.getTime()))
      .get

    val mockData = spark.sparkContext
      .parallelize(
        Seq(
          Member(
            memberId = "12345",
            memberIdSuffix = "Mr.",
            memberSubscriberId = "000000011",
            memberEmpi = None,
            memberFirstName = "firstname",
            memberLastName = "lastname",
            Some("w"),
            Genders.male.toString,
            testDate,
            Some("123456789"),
            List(
              Telecom("12345678910", None, None, Some(true), Some(true), "")
            ),
            Option(
              List(
                Email(
                  "test@gmail.com",
                  None,
                  Some(testDate),
                  isCurrent = Some(true),
                  isActive = Some(true)
                )
              )
            ),
            List(
              Address(
                "10 Awesome Dr",
                "",
                "St. Louis",
                "MO",
                "63000",
                None,
                None,
                None,
                None
              )
            ),
            List(
              MemberEligibility(
                "productid",
                "productCategoryCode",
                "classId",
                "planId",
                "groupId",
                None,
                false,
                testDate,
                None,
                None,
                None,
                None,
                None,
                None,
                None
              )
            )
          )
        )
      )
      .toDF()
    mockData.show()
  }
}

Я ожидал получить схему информационного кадра (или набор данных в этом случае, что я и получил:

[info] HelloSpec:
[info] My analytics
[info] - should calculate the right thing *** FAILED ***
[info]   org.apache.spark.sql.AnalysisException: cannot resolve 'wrapoption(staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue10, MapObjects_loopIsNull11, StructField(address,StringType,true), StructField(effectiveDate,DateType,true), StructField(terminationDate,DateType,true), StructField(isCurrent,BooleanType,true), StructField(isActive,BooleanType,true), if (isnull(lambdavariable(MapObjects_loopValue10, MapObjects_loopIsNull11, StructField(address,StringType,true), StructField(effectiveDate,DateType,true), StructField(terminationDate,DateType,true), StructField(isCurrent,BooleanType,true), StructField(isActive,BooleanType,true)))) null else newInstance(class io.swagger.client.model.Email), cast(memberEmailAddresses as array<struct<address:string,effectiveDate:date,terminationDate:date,isCurrent:boolean,isActive:boolean>>)).array, true), ObjectType(class scala.collection.immutable.List))' due to data type mismatch: argument 1 requires scala.collection.immutable.List type, however, 'staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue10, MapObjects_loopIsNull11, StructField(address,StringType,true), StructField(effectiveDate,DateType,true), StructField(terminationDate,DateType,true), StructField(isCurrent,BooleanType,true), StructField(isActive,BooleanType,true), if (isnull(lambdavariable(MapObjects_loopValue10, MapObjects_loopIsNull11, StructField(address,StringType,true), StructField(effectiveDate,DateType,true), StructField(terminationDate,DateType,true), StructField(isCurrent,BooleanType,true), StructField(isActive,BooleanType,true)))) null else newInstance(class io.swagger.client.model.Email), cast(memberEmailAddresses as array<struct<address:string,effectiveDate:date,terminationDate:date,isCurrent:boolean,isActive:boolean>>)).array, true)' is of scala.collection.Seq type.;
[info]   at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:82)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:310)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:310)
[info]   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:309)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5$$anonfun$apply$11.apply(TreeNode.scala:360)
[info]   ...

ОБНОВЛЕНИЕ

так вместо

val mockData = spark.sparkContext
      .parallelize(
        Seq(

или

val mockData = spark.sparkContext
      .parallelize(
        List(

Использование массива работает?

val mockData = spark.sparkContext
      .parallelize(
        Array(

Почему Array работает, но Seq и List не работают?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...