DataType не соответствует исключению при загрузке данных в Spark DataFrame - PullRequest
0 голосов
/ 21 февраля 2019

У меня есть данные ниже, имеющие тип возвращаемого значения как Array (Array (AnyRef)).2-й ряд имеет тип данных базового набора данных.Я хочу прочитать данные в искровой датафрейм.Тем не менее, я получаю следующее исключение при этом.

Исключение в потоке "main" scala.MatchError: StringType (из класса java.lang.String)

Любые предложения по преодолению этого ????

var data = getData
val df = getTable(data, spark)
df.printSchema()
df.show()
def getData: Array[Array[AnyRef]] =

Array[Array[AnyRef]](Array("MS_FUND_ID", "MS_INVESTMENT_TYPE", "CURRENCY", "MS_CAT_ID", "date", "MS_NETFLOWS_FUND", "MS_AUM_FUND"),
  Array("StringType", "StringType", "StringType", "StringType", "StringType", "DoubleType", "DoubleType"),
  Array("F00000MLKR", "OE", "USD", "C1", "2017-10-31", "10", "15"),
  Array("F00000MLKS", "OE", "USD", "C1", "2017-10-31", "-10", "10"),
  Array("F00000MLKS", "OX", "USD", "C1", "2017-10-31", "-10", "10"),
  Array("F00000MLKT", "INS", "USD", "C1", "2017-10-31", "30", "50"))


def getTable(table: Array[Array[AnyRef]], spark: SparkSession): DataFrame = 

{ 

val fields = new util.ArrayList[StructField]
val fieldNames = table(0)
var fieldTypes = table(1)

fieldTypes = fieldTypes.map {
  case x: DoubleType => x.asInstanceOf[DoubleType]
  case x: StringType => x.asInstanceOf[StringType]
}
var f = 0
while ( {
  f < table(0).length
}) {
  var fn = fieldNames(f).toString
  var ft = fieldTypes(f).asInstanceOf[DataType]
  fields.add(StructField(fn, ft, true, Metadata.empty))

  {
    f += 1;
    f - 1
  }
}
val schema1 = new StructType(fields.toArray(new Array[StructField](fields.size)))
println(schema1)
val rows = new util.ArrayList[Row]
var r = 2
while ( {
  r < table.length
}) {
  val data = table(r)
  rows.add(new GenericRowWithSchema(data.asInstanceOf[Array[Any]], schema1))

  {
    r += 1;
    r - 1
  }
}
var i = 0
while ( {
  i < rows.size
}) {
  System.out.println("Rows : " + rows.get(i))

  {
    i += 1;
    i - 1
  }
}
val DF = spark.createDataFrame(rows, schema1)
DF

} '' '

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...