Чтение файла CSV в искре различных столбцов - PullRequest
3 голосов
/ 25 марта 2019

Я хотел бы прочитать CSV-файл в dataframe в спарк с использованием Scala. Мой CSV-файл имеет первую запись, которая имеет три столбца, а остальные записи имеют 5 столбцов. Мой CSV-файл не поставляется с именами столбцов. Я упомянул здесь для понимания

Ex:
I'dtype  date             recordsCount
0          13-02-2015  300
I'dtype  date          type      location.     locationCode
1         13-02-2015.    R.          USA.            Us
1.        13-02-2015.    T.          London.      Lon

У меня вопрос, как я буду читать этот файл в dataframe, так как первая и оставшаяся строки имеют разные столбцы. Решение, которое я попробовал, это прочитать файл как rdd и отфильтровать запись заголовка, а затем преобразовать оставшиеся записи в фрейм данных. Есть ли лучшее решение для? Пожалуйста, помогите мне

Ответы [ 3 ]

1 голос
/ 26 марта 2019

Вы можете загрузить файлы в виде необработанного текста, а затем использовать классы падежей, Either экземпляры и сопоставление с образцом, чтобы разобраться, что и куда идет. Пример этого ниже.

case class Col3(c1: Int, c2: String, c3: Int)
case class Col5(c1: Int, c2: String, c5_col3: String, c4:String, c5: String)
case class Header(value: String)

type C3 = Either[Header, Col3]
type C5 = Either[Header, Col5]

// assume sqlC & sc created 

val path = "tmp.tsv"
val rdd = sc.textFile(path)

val eitherRdd: RDD[Either[C3, C5]] = rdd.map{s =>
  val spl = s.split("\t")
  spl.length match{
    case 3 =>
      val res = Try{
        Col3(spl(0).toInt, spl(1), spl(2).toInt)
      }
      res match{
        case Success(c3) => Left(Right(c3))
        case Failure(_) => Left(Left(Header(s)))
      }
    case 5 =>
      val res = Try{
        Col5(spl(0).toInt, spl(1), spl(2), spl(3), spl(4))
      }
      res match{
        case Success(c5) => Right(Right(c5))
        case Failure(_) => Right(Left(Header(s)))
      }
    case _ => throw new Exception("fail")
  }
}

val rdd3 = eitherRdd.flatMap(_.left.toOption)
val rdd3Header = rdd3.flatMap(_.left.toOption).collect().head
val df3 = sqlC.createDataFrame(rdd3.flatMap(_.right.toOption))

val rdd5 = eitherRdd.flatMap(_.right.toOption)
val rdd5Header = rdd5.flatMap(_.left.toOption).collect().head
val df5 = sqlC.createDataFrame(rdd5.flatMap(_.right.toOption))

df3.show()

df5.show()

Проверено с простым tsv ниже:

col1    col2    col3
0   sfd 300
1   asfd    400
col1    col2    col4    col5    col6
2   pljdsfn R   USA Us
3   sad T   London  Lon

, который дает вывод

+---+----+---+
| c1|  c2| c3|
+---+----+---+
|  0| sfd|300|
|  1|asfd|400|
+---+----+---+

+---+-------+-------+------+---+
| c1|     c2|c5_col3|    c4| c5|
+---+-------+-------+------+---+
|  2|pljdsfn|      R|   USA| Us|
|  3|    sad|      T|London|Lon|
+---+-------+-------+------+---+

Для простоты я проигнорировал форматирование даты, просто сохранив эти поля как строки. однако было бы не намного сложнее добавить анализатор даты, чтобы получить правильный тип столбца.

Точно так же я полагался на ошибку разбора, чтобы указать строку заголовка. Вы можете заменить другую логику, если синтаксический анализ не завершится неудачей или если необходимо выполнить более сложное определение. Точно так же потребуется более сложная логика, чтобы различать разные типы записей одинаковой длины или которые могут содержать (экранированный) символ разделения

1 голос
/ 25 марта 2019

Это что-то вроде хака, но здесь есть решение игнорировать первую строку файла.

val cols = Array("dtype", "date", "type", "location", "locationCode")
val schema = new StructType(cols.map(n => StructField(n ,StringType, true)))
spark.read
    .schema(schema) // we specify the schema
    .option("header", true) // and tell spark that there is a header
    .csv("path/file.csv")

Первая строка - это заголовок, но схема указана.Таким образом, первая строка игнорируется.

0 голосов
/ 26 марта 2019

Вы можете отбросить эти искаженные строки при чтении.

spark.read
          .option("mode", "dropMalformed")
          ...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...