Мне нужно пропустить три строки из кадра данных при загрузке из файла CSV в Scala - PullRequest
2 голосов
/ 28 мая 2019

Я загружаю свой CSV-файл во фрейм данных, и я могу это сделать, но мне нужно пропустить начальные три строки из файла.

Я попробовал команду .option(), указав заголовок как true, но он игнорирует только первую строку.

val df = spark.sqlContext.read
    .schema(Myschema)
    .option("header",true)
    .option("delimiter", "|")
    .csv(path)

Я думал о том, чтобы дать заголовок в виде 3 строк, но я не мог найти способ сделать это.

альтернативная мысль: пропустить эти 3 строки из фрейма данных

Пожалуйста, помогите мне с этим. Заранее спасибо.

Ответы [ 3 ]

0 голосов
/ 28 мая 2019

Обычным способом решения вашей проблемы будет индексирование информационного кадра и фильтрация индексов, которые больше 2.

Простой подход:

Как предлагается вдругой ответ, вы можете попробовать добавить индекс с помощью monotonically_increasing_id.

df.withColumn("Index",monotonically_increasing_id)
  .filter('Index > 2)
  .drop("Index")

. Тем не менее, это будет работать, только если первые 3 строки находятся в первом разделе.Более того, как уже упоминалось в комментариях, сегодня это так, но этот код может полностью сломаться при появлении новых версий или появиться, и это будет очень сложно отладить.Действительно, контракт в API - это просто «Сгенерированный идентификатор гарантированно будет монотонно увеличивающимся и уникальным, но не последовательным».Поэтому не очень мудро предполагать, что они всегда будут начинаться с нуля.В текущей версии могут быть даже другие случаи, когда это не работает (хотя я не уверен).

Чтобы проиллюстрировать мою первую проблему, взгляните на это:

scala> spark.range(4).withColumn("Index",monotonically_increasing_id()).show()
+---+----------+
| id|     Index|
+---+----------+
|  0|         0|
|  1|         1|
|  2|8589934592|
|  3|8589934593|
+---+----------+

Мы удалили бы только две строки ...

Безопасный подход:

Предыдущий подход будет работать большую часть времени, хотя, чтобы быть безопасным, вы можете использоватьzipWithIndex из API RDD для получения последовательных индексов.

def zipWithIndex(df : DataFrame, name : String) : DataFrame = {
  val rdd = df.rdd.zipWithIndex
    .map{ case (row, i) => Row.fromSeq(row.toSeq :+ i) }
  val newSchema = df.schema
    .add(StructField(name, LongType, false))
  df.sparkSession.createDataFrame(rdd, newSchema)
}
zipWithIndex(df, "index").where('index > 2).drop("index")

Мы можем проверить, что это безопаснее:

scala> zipWithIndex(spark.range(4).toDF("id"), "index").show()
+---+-----+
| id|index|
+---+-----+
|  0|    0|
|  1|    1|
|  2|    2|
|  3|    3|
+---+-----+
0 голосов
/ 28 мая 2019

Вы можете попробовать изменить wrt к вашей схеме.

 import org.apache.spark.sql.Row
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)

  //Read CSV
  val file = sc.textFile("csvfilelocation")

  //Remove first 3 lines
  val data = file.mapPartitionsWithIndex{ (idx, iter) => if (idx == 0) iter.drop(3) else iter }

  //Create RowRDD by mapping each line to the required fields 
  val rowRdd = data.map(x=>Row(x(0), x(1)))

  //create dataframe by calling sqlcontext.createDataframe with rowRdd and your schema   
  val df = sqlContext.createDataFrame(rowRdd, schema)
0 голосов
/ 28 мая 2019

Вы можете попробовать эту опцию

df.withColumn("Index",monotonically_increasing_id())
        .filter(col("Index") > 2)
        .drop("Index")
...