Создание типов столбцов для схемы - PullRequest
0 голосов
/ 02 октября 2018

У меня есть текстовый файл, из которого я читаю и анализирую для создания кадра данных.Однако столбцы amount и code должны иметь тип IntegerTypes.Вот что у меня есть:

def getSchema: StructType = {
        StructType(Seq(
          StructField("carrier", StringType, false),
          StructField("amount", StringType, false),
          StructField("currency", StringType, false),
          StructField("country", StringType, false),
          StructField("code", StringType, false),
        ))
      }

  def getRow(x: String): Row = {
    val columnArray = new Array[String](5)
    columnArray(0) = x.substring(40, 43)
    columnArray(1) = x.substring(43, 46)
    columnArray(2) = x.substring(46, 51)
    columnArray(3) = x.substring(51, 56)
    columnArray(4) = x.substring(56, 64)
    Row.fromSeq(columnArray)
  }

Поскольку я определил Array[String], столбцы могут быть только StringTypes, а не множеством как String и Integer.Чтобы подробно объяснить мою проблему, вот что происходит:

Сначала я создаю пустой фрейм данных:

  var df = spark.sqlContext.createDataFrame(spark.sparkContext.emptyRDD[Row], getSchema)

Затем у меня есть цикл for, который проходит через каждый файл во всех каталогах.Примечание: мне нужно проверить каждый файл, и я не могу прочитать все сразу.

for (each file parse):
  df2  = spark.sqlContext.createDataFrame(spark.sparkContext.textFile(inputPath)
    .map(x => getRow(x)), schema)
df = df.union(df2)

Теперь у меня есть полный фрейм данных всех файлов.Однако столбцы amount и code по-прежнему являются типами StringTypes.Как я могу сделать так, чтобы они были IntegerTypes?

Обратите внимание: я не могу привести столбцы во время цикла for, потому что это занимает много времени.Я бы хотел, чтобы текущая структура была максимально похожей.В конце цикла for я мог бы привести столбцы как IntegerTypes, однако, что если столбец содержит значение, которое не является Integer?Я хотел бы, чтобы столбцы не были NULL.

Есть ли способ сделать 2 указанных столбца IntegerTypes без добавления большого количества изменений в код?

1 Ответ

0 голосов
/ 02 октября 2018

Как насчет использования наборов данных?

Сначала создайте класс наблюдений, моделирующий ваши данные:

case class MyObject(
    carrier: String,
    amount: Double,
    currency: String,
    country: String,
    code: Int)

создайте другой класс наблюдений, добавив в первый класс дополнительную информацию (потенциальные ошибки, исходный файл).):

case class MyObjectWrapper(
                      myObject: Option[MyObject],
                      someError: Option[String],
                      source: String
                      )

Затем создайте парсер, преобразуя строку из вашего файла в myObject:

object Parser {
  def parse(line: String, file: String): MyObjectWrapper = {
    Try {
      MyObject(
        carrier = line.substring(40, 43),
        amount = line.substring(43, 46).toDouble,
        currency = line.substring(46, 51),
        country = line.substring(51, 56),
        code = line.substring(56, 64).toInt)
    } match {
      case Success(objectParsed) => MyObjectWrapper(Some(objectParsed), None, file)
      case Failure(error) => MyObjectWrapper(None, Some(error.getLocalizedMessage), file)
    }
  }
}

Наконец, проанализируйте ваши файлы:

import org.apache.spark.sql.functions._
val ds = files
  .filter( {METHOD TO SELECT CORRECT FILES})
  .map( { GET INPUT PATH FROM FILES} )
  .map(path => spark.read.textFile(_).map(Parser.parse(_, path))
  .reduce(_.union(_))

Thisдолжен предоставить вам набор данных [MyObjectWrapper] с типами и API, которые вы хотите.

После этого вы можете взять те, которые вы можете проанализировать:

ds.filter(_.someError == None)

Или взять те, которые вы не смогли проанализировать (для расследования):

ds.filter(_.someError != None)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...