Удаление пустой строки из текстового файла с помощью искры - PullRequest
0 голосов
/ 20 января 2020

У меня есть файл данных в следующем формате: файл имеет 10 столбцов. Между каждой строкой есть пробел. Я должен загрузить эти данные во фрейм данных после пропуска пробела и распечатать различное значение, основанное на последнем столбце типа:

Пример файла данных:

debo bangalore 3 4 5 6 7 8 9 Yes

debo banaglore 4 5 6 7 8 9 10 Yes

abhi Delhi 6 7 9 10 99 99 00 No


Expected o/p:

Yes debo bangalore
No  abhi Delhi




/

/ Defining the data-frame header structure
      val fileHeader = "name address client_add result_code bytes req_method url user hierarchy_code type"
      val schema= StructType(fileHeader.split(" ").map(field=>StructField(field,StringType,true)))
      val textFile=sparkSession.sparkContext.textFile("src/main/resources/student.txt")
      // Converting String RDD to Row RDD for 10 attributes
      val rowRDD = textFile.map(x=>x.split(" "))

      val rowRdd2 = rowRDD.filter(x => (x!= null) && (x.length > 0))
      val rowrdd1=rowRdd2.map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5) , x(6) , x(7) , x(8), x(9)))

      val squidDF = sparkSession.sqlContext.createDataFrame(rowrdd1, schema)
      squidDF.createOrReplaceTempView("data")

Но вышеприведенный код выдает:

20/01/20 23:32:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.ArrayIndexOutOfBoundsException: 1
    at AccumulatorExample$$anonfun$4.apply(AccumulatorExample.scala:50)
    at AccumulatorExample$$anonfun$4.apply(AccumulatorExample.scala:50)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)

Я не могу выйти из исключения. Невозможно загрузить эти данные во фрейм данных.

Ответы [ 3 ]

0 голосов
/ 21 января 2020

@ Решение Yayati хорошее, за исключением проверки пустой и частичной строки. Без этого вы можете столкнуться с NPE. Они исправлены здесь -

Файл -

debo bangalore 3 4 5 6 7 8 9 Yes

debo banaglore 4 5 6 7 8 9 10 Yes

abhi Delhi 6 7 9 10 99 99 00 No


alex Newyork

Код -

    val fileHeader = "name address client_add result_code bytes req_method url user hierarchy_code type"
    val columnSize = fileHeader.split(" ").size
    val schema= StructType(fileHeader.split(" ").map(field=>StructField(field,StringType,true)))
    val textFile=spark.sparkContext.textFile("/Users/msayed2/Documents/temp/test1.txt")

    // Converting String RDD to Row RDD for 10 attributes
    val rowRDD = textFile.filter(_.trim.length > 0).map(x=>x.split(" ")).filter(_.size == columnSize)

    val rowRdd2 = rowRDD.filter(x => (x!= null) && (x.length > 0))
    val rowrdd1=rowRdd2.map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5) , x(6) , x(7) , x(8), x(9)))

    val squidDF = spark.sqlContext.createDataFrame(rowrdd1, schema)
    squidDF.show()

Результат

+----+---------+----------+-----------+-----+----------+---+----+--------------+----+
|name|  address|client_add|result_code|bytes|req_method|url|user|hierarchy_code|type|
+----+---------+----------+-----------+-----+----------+---+----+--------------+----+
|debo|bangalore|         3|          4|    5|         6|  7|   8|             9| Yes|
|debo|banaglore|         4|          5|    6|         7|  8|   9|            10| Yes|
|abhi|    Delhi|         6|          7|    9|        10| 99|  99|            00|  No|
+----+---------+----------+-----------+-----+----------+---+----+--------------+----+
0 голосов
/ 21 января 2020

Вы можете напрямую использовать фрейм данных Spark с использованием различных режимов анализа:

mode : по умолчанию это РАЗРЕШИТЕЛЬНЫЙ.

Возможные значения:

PERMISSIVE : попытаться проанализировать все строки: для отсутствующих токенов вставлены пустые значения, а дополнительные токены игнорируются.

DROPMALFORMED : отбросить строки, которые содержат меньше или больше токенов, чем ожидалось, или токены, которые не соответствуют схеме.

FAILFAST : прервать с RuntimeException, если обнаружена какая-либо некорректная строка.

Файл -

debo bangalore 3 4 5 6 7 8 9 Yes

debo banaglore 4 5 6 7 8 9 10 Yes

abhi Delhi 6 7 9 10 99 99 00 No


alex Newyork

Код -

import org.apache.spark.sql.types.{StructType,StringType,StructField}

val fileHeader = "name address client_add result_code bytes req_method url user hierarchy_code type"

val columnSize = fileHeader.split(" ").size
val schema= StructType(fileHeader.split(" ").map(field=>StructField(field,StringType,true)))

val csvFilePath = "tmp/data/data_with_empty_lines.txt"

val non_empty_df = spark.read.option("header", "false")
                        .option("delimiter", " ")
                        .option("inferSchema", "true")
                        .schema(schema)
                        .option("mode", "DROPMALFORMED")
                        .csv(csvFilePath)

non_empty_df.show()

Результат -

+----+---------+----------+-----------+-----+----------+---+----+--------------+----+
|name|  address|client_add|result_code|bytes|req_method|url|user|hierarchy_code|type|
+----+---------+----------+-----------+-----+----------+---+----+--------------+----+
|debo|bangalore|         3|          4|    5|         6|  7|   8|             9| Yes|
|debo|banaglore|         4|          5|    6|         7|  8|   9|            10| Yes|
|abhi|    Delhi|         6|          7|    9|        10| 99|  99|            00|  No|
+----+---------+----------+-----------+-----+----------+---+----+--------------+----+

Для получения дополнительной информации о чтении файлов CSV - https://docs.databricks.com/data/data-sources/read-csv.html

0 голосов
/ 20 января 2020

Я изменил и протестировал ваш код следующим образом:

  val fileHeader = 
    "name address client_add result_code bytes req_method url user hierarchy_code type"

  val schema= StructType(fileHeader.split(" ").map(field=>StructField(field,StringType,true)))

  val textFileRDD=sparkSession.sparkContext.textFile("src/main/resources/student.txt")

  val rowRDD = textFileRDD.filter(_.trim.length >0).map(_.split(" ")) // Removing empty lines and creating RDD of "clean" data

  val rowrdd1=rowRdd2.map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5) , x(6) , x(7) , x(8), x(9)))

  val squidDF = sparkSession.sqlContext.createDataFrame(rowrdd1, schema)

  squidDF.show

  squidDF.createOrReplaceTempView("data")

Следует помнить, попробуйте отфильтровать ваши данные как можно раньше в конвейере данных

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...