Spark не может загрузить фрейм данных изasticsearch из-за исключения формата поля - PullRequest
0 голосов
/ 25 апреля 2018

Мой dataframe завершается ошибкой из-за NumberFormatException в одном из вложенных полей JSON при чтении из Elasticsearch.Я не предоставляю никакой схемы, так как она должна автоматически выводиться из Elastic.

package org.arc
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import scala.io.Source
import java.nio.charset.CodingErrorAction
import scala.io.Codec
import org.apache.spark.storage.StorageLevel
import org.apache.spark._
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.Utils
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.expressions
import org.apache.spark.sql.functions.{ concat, lit }
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.{ StructType, StructField, StringType };
import org.apache.spark.serializer.KryoSerializer

object SparkOnES {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("SparkESTest")
      .config("spark.master", "local[*]")
      .config("spark.sql.warehouse.dir", "C://SparkScala//SparkLocal//spark-warehouse")
      .enableHiveSupport()
      .getOrCreate()

    //1.Read Sample JSON
    import spark.implicits._

    //val myjson = spark.read.json("C:\\Users\\jasjyotsinghj599\\Desktop\\SampleTest.json")
    // myjson.show(false)

    //2.Read Data from ES
    val esdf = spark.read.format("org.elasticsearch.spark.sql")
      .option("es.nodes", "XXXXXX")
      .option("es.port", "80")
      .option("es.query", "?q=*")
      .option("es.nodes.wan.only", "true")
      .option("pushdown", "true")
      .option("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .load("batch_index/ticket")

    esdf.createOrReplaceTempView("esdf")
    spark.sql("Select * from esdf limit 1").show(false)
    val esdf_fltr_lt = esdf.take(1)

  }
}

ErrorStack говорит, что не может проанализировать поле ввода. Смотря на исключение, эта проблема, кажется, вызвана из-за несоответствия втип ожидаемых данных (int, float, double) и полученных (строка):

Caused by: java.lang.NumberFormatException: For input string: "161.60"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Long.parseLong(Long.java:589)
    at java.lang.Long.parseLong(Long.java:631)
    at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)
    at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
    at org.elasticsearch.spark.serialization.ScalaValueReader.parseLong(ScalaValueReader.scala:142)
    at org.elasticsearch.spark.serialization.ScalaValueReader$$anonfun$longValue$1.apply(ScalaValueReader.scala:141)
    at org.elasticsearch.spark.serialization.ScalaValueReader$$anonfun$longValue$1.apply(ScalaValueReader.scala:141)
    at org.elasticsearch.spark.serialization.ScalaValueReader.checkNull(ScalaValueReader.scala:120)
    at org.elasticsearch.spark.serialization.ScalaValueReader.longValue(ScalaValueReader.scala:141)
    at org.elasticsearch.spark.serialization.ScalaValueReader.readValue(ScalaValueReader.scala:89)
    at org.elasticsearch.spark.sql.ScalaRowValueReader.readValue(ScalaEsRowValueReader.scala:46)
    at org.elasticsearch.hadoop.serialization.ScrollReader.parseValue(ScrollReader.java:770)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:720)
    ... 25 more
18/04/25 23:33:53 WARN TaskSetManager: Lost task 3.0 in stage 1.0 (TID 4, localhost): org.elasticsearch.hadoop.rest.EsHadoopParsingException: Cannot parse value [161.60] for field [tvl_tkt_tot_chrg_amt]
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:723)
    at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:867)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:710)
    at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:476)
    at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:401)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:296)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:269)
    at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:393)
    at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:92)
    at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:61)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: For input string: "161.60"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Long.parseLong(Long.java:589)
    at java.lang.Long.parseLong(Long.java:631)
    at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)
    at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
    at org.elasticsearch.spark.serialization.ScalaValueReader.parseLong(ScalaValueReader.scala:142)
    at org.elasticsearch.spark.serialization.ScalaValueReader$$anonfun$longValue$1.apply(ScalaValueReader.scala:141)
    at org.elasticsearch.spark.serialization.ScalaValueReader$$anonfun$longValue$1.apply(ScalaValueReader.scala:141)
    at org.elasticsearch.spark.serialization.ScalaValueReader.checkNull(ScalaValueReader.scala:120)
    at org.elasticsearch.spark.serialization.ScalaValueReader.longValue(ScalaValueReader.scala:141)
    at org.elasticsearch.spark.serialization.ScalaValueReader.readValue(ScalaValueReader.scala:89)
    at org.elasticsearch.spark.sql.ScalaRowValueReader.readValue(ScalaEsRowValueReader.scala:46)
    at org.elasticsearch.hadoop.serialization.ScrollReader.parseValue(ScrollReader.java:770)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:720)
    ... 25 more

18/04/25 23:33:53 INFO SparkContext: Invoking stop() from shutdown hook
18/04/25 23:33:53 INFO SparkUI: Stopped Spark web UI at http://10.1.2.244:4040
18/04/25 23:33:53 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/04/25 23:33:53 INFO MemoryStore: MemoryStore cleared
18/04/25 23:33:53 INFO BlockManager: BlockManager stopped
18/04/25 23:33:53 INFO BlockManagerMaster: BlockManagerMaster stopped
18/04/25 23:33:53 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/04/25 23:33:53 INFO SparkContext: Successfully stopped SparkContext
18/04/25 23:33:53 INFO ShutdownHookManager: Shutdown hook called
18/04/25 23:33:53 INFO ShutdownHookManager: Deleting directory 
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...