Spark фильтр функционирует с ошибкой - PullRequest
0 голосов
/ 22 февраля 2020

Ниже приведен пример необработанных данных

tweet_id,airline_sentiment,airline_sentiment_confidence,negativereason,negativereason_confidence,airline,airline_sentiment_gold,name,negativereason_gold,retweet_count,text,tweet_coord,tweet_created,tweet_location,user_timezone
570306133677760513,neutral,1.0,,,Virgin America,,cairdin,,0,@VirginAmerica What @dhepburn said.,,2015-02-24 11:35:52 -0800,,Eastern Time (US & Canada)

Ниже приведена моя программа

val data = sc.textFile("/user/inputs/Tweets.csv")
val map_data = data.map(x=> x.split(","))
val filterdata  = map_data.filter(x=> x(5) == "Virgin America").count()  

. Ниже приводится исключение:

[Stage 0:>                                                                                                                                                                        (0 + 2) / 2]
20/02/21 21:50:41 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-10-0-1-10.ec2.internal, executor 1): java.lang.ArrayIndexOutOfBoundsException: 5                       
        at $line27.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:31)                                                                                               
        at $line27.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:31)   

Ответы [ 2 ]

2 голосов
/ 22 февраля 2020

ваши данные нельзя разделить, как это может быть причиной извлечения индекса массива, см. Ниже код .... который будет копировать вашу версию в варианте 1

Я уточнил его с помощью API-интерфейса spark csv, который может быть полезным для вас.


    package examples

    import org.apache.log4j.Level

    object CSVTest extends App {
      import org.apache.spark.sql.{Dataset, SparkSession}
      val spark = SparkSession.builder().appName("CsvExample").master("local").getOrCreate()
      val logger = org.apache.log4j.Logger.getLogger("org")
      logger.setLevel(Level.WARN)
      import spark.implicits._
      import org.apache.spark.sql.functions._
      val csvData: Dataset[String] = spark.sparkContext.parallelize(
        """
          |tweet_id,airline_sentiment,airline_sentiment_confidence,negativereason,negativereason_confidence,airline,airline_sentiment_gold,name,negativereason_gold,retweet_count,text,tweet_coord,tweet_created,tweet_location,user_timezone
          |570306133677760513,neutral,1.0,,,Virgin America,,cairdin,,0,@VirginAmerica What @dhepburn said.,,2015-02-24 11:35:52 -0800,,Eastern Time (US & Canada)
        """.stripMargin.lines.toList).toDS()


      println("option 2 : spark csv version ")
      val frame = spark.read.option("header", true).option("inferSchema",true).csv(csvData)
      frame.show()
      frame.printSchema()
     println( frame.filter($"airline" === "Virgin America").count())

     println("option 1: your version which is not splittable thats the reason getting arrayindex out of bound ")
      val filterdata = csvData.map(x=> x.split(","))
      filterdata.foreach(x => println(x.mkString))
    //    filterdata.show(false)
    //    filterdata.filter {x=> {
    //      println(x)
    //      x(5) == "Virgin America"
    //    }
    //    }
    //  .count()


    }

Результат:

option 2 : spark csv version 
+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+-------+-------------------+-------------+--------------------+-----------+--------------------+--------------+--------------------+
|          tweet_id|airline_sentiment|airline_sentiment_confidence|negativereason|negativereason_confidence|       airline|airline_sentiment_gold|   name|negativereason_gold|retweet_count|                text|tweet_coord|       tweet_created|tweet_location|       user_timezone|
+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+-------+-------------------+-------------+--------------------+-----------+--------------------+--------------+--------------------+
|570306133677760513|          neutral|                         1.0|          null|                     null|Virgin America|                  null|cairdin|               null|            0|@VirginAmerica Wh...|       null|2015-02-24 11:35:...|          null|Eastern Time (US ...|
+------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+-------+-------------------+-------------+--------------------+-----------+--------------------+--------------+--------------------+

root
 |-- tweet_id: long (nullable = true)
 |-- airline_sentiment: string (nullable = true)
 |-- airline_sentiment_confidence: double (nullable = true)
 |-- negativereason: string (nullable = true)
 |-- negativereason_confidence: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- airline_sentiment_gold: string (nullable = true)
 |-- name: string (nullable = true)
 |-- negativereason_gold: string (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- tweet_coord: string (nullable = true)
 |-- tweet_created: string (nullable = true)
 |-- tweet_location: string (nullable = true)
 |-- user_timezone: string (nullable = true)

1
option 1: your version which is not splittable thats the reason getting arrayindex out of bound 
tweet_idairline_sentimentairline_sentiment_confidencenegativereasonnegativereason_confidenceairlineairline_sentiment_goldnamenegativereason_goldretweet_counttexttweet_coordtweet_createdtweet_locationuser_timezone
570306133677760513neutral1.0Virgin Americacairdin0@VirginAmerica What @dhepburn said.2015-02-24 11:35:52 -0800Eastern Time (US & Canada)


Process finished with exit code 0

0 голосов
/ 28 февраля 2020

Я нашел решение, это некоторые нулевые данные в моем наборе данных, которые вызывали arrayindexoutofboundexception. Я изменил функцию следующим образом:

Я использовал функцию array.length, чтобы отфильтровать те строки, где мы имеем нулевые данные

Образцы данных

DBN, название школы, количество тестируемых, среднее значение для чтения, среднее значение по математике, среднее значение для записи 01M292, Школа международных исследований имени Генри-стрит, 31 391 425 385 01M448, Высший район университета Школа, 60 394 419 387 01M450, средняя школа East Side Community School, 69 418 431 402 01M458, СПУТНИКОВАЯ АКАДЕМИЯ FORSYTH ST, 26 385 370 378 01M509, CMSP HIGH SCHOOL ,,,,

'' '

     val conf = new SparkConf().setAppName("Spark Scala School Data analysis Example").setMaster("local[1]")
    val sc = new SparkContext(conf) 
    val data = sc.textFile("C:\\Sankhadeep\\Study\\data\\SAT_School_Level_Results.csv", 2)
    val spark = SparkSession.builder().appName("sample").master("local").getOrCreate()
    val map_data = data.map(x=> x.split(","))
    //val map_data1 = map_data.map(x=> handleNull(x))
    val sample = map_data.filter(x=> (x.length > 4)).filter(x=> (x(4) != "Mathematics Mean")).filter(x=> (x(4).toInt > 500  ))
    '''
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...