Spark -Scala - конвертировать CSV-файл в пользовательский объект - PullRequest
0 голосов
/ 09 октября 2018

Как преобразовать данные CSV в пользовательский объект в спарк.Ниже приведен фрагмент моего кода

val sparkSession = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .master("local[2]")
      .getOrCreate()

    val citiData = sparkSession.read.option("header", "true").option("inferSchema", "true").csv(filePath) // removing header,and applying schema

    //citiData.describe().show()
    import sparkSession.implicits._
    val s: Dataset[CityData] = citiData.as[CityData]

  }
  //Date,Open,High,Low,Close,Volume
  case class CityData(processingDate: java.util.Date, Open: Double, High: Double, Low: Double, Volume: Double)

Пример набора данных:

Date,Open,High,Low,Close,Volume
2006-01-03,490.0,493.8,481.1,492.9,1537660
2006-01-04,488.6,491.0,483.5,483.8,1871020
2006-01-05,484.4,487.8,484.0,486.2,1143160
2006-01-06,488.8,489.0,482.0,486.2,1370250

Я перешел на тип входного параметра CityData класса case на String, тогда он вызывает «не удается разрешить» processingDate«заданные входные столбцы: [Объем, Закрыть, Высокий, Дата, Низкий, Открытый];»исключение.

  1. Как мне создать пользовательский объект
  2. Еще один хитрый способ конвертировать в объект Date

Как я могу это сделать?Пожалуйста, поделитесь своими идеями.

1 Ответ

0 голосов
/ 09 октября 2018

В вашем случае, если вы не установите опцию header в значение true, Spark будет читать столбцы с типом String.С параметром header вы можете видеть:

val df = sqlContext.read.option("header", true).option("inferSchema", true).csv("pathToFile")
df.printSchema
//Prints
root
|-- Date: timestamp (nullable = true)
|-- Open: double (nullable = true)
|-- High: double (nullable = true)
|-- Low: double (nullable = true)
|-- Close: double (nullable = true)
|-- Volume: integer (nullable = true)

Если вы попытаетесь преобразовать строки в CityData, вы получите следующую ошибку:

java.lang.UnsupportedOperationException: No Encoder found for java.util.Date

Это означает, что вы не можете конвертировать TimestampType непосредственно в java.util.Date.Вот сопоставления типов;

  • TimestampType => java.sql.Timestamp
  • DateType => java.sql.Date

После изменения типаprocessingDate от java.util.Date до java.sql.Timestamp, вы все равно получите сообщение об ошибке cannot resolve 'processingDate'.Вам также необходимо изменить имя поля processingDate на Date в CityData.затем вы можете преобразовать ваш набор данных в Dataset[CityData], используя df.as[CityData].Надеюсь, это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...