org.apache.spark.SparkException: задача не сериализуема - Scala - PullRequest
0 голосов
/ 19 мая 2019

Я читаю текстовый файл, и это файл фиксированной ширины, который мне нужно преобразовать в CSV.Моя программа отлично работает на локальном компьютере, но когда я запускаю ее на кластере, она выдает исключение «Задача не сериализуема».

Я пытался решить ту же проблему с map и mapPartition.

Работает нормальнос помощью toLocalIterator на RDD.Но он не работает с большими файлами (у меня есть файлы размером 8 ГБ)

Ниже приведен код с использованием mapPartition, который я недавно пытался

// прочитать исходный файл и создать RDD

def main(){
    var inpData = sc.textFile(s3File)
    LOG.info(s"\n inpData >>>>>>>>>>>>>>> [${inpData.count()}]")

    val rowRDD = inpData.mapPartitions(iter=>{
    var listOfRow = new ListBuffer[Row]
    while(iter.hasNext){
       var line = iter.next()
       if(line.length() >= maxIndex){
          listOfRow += getRow(line,indexList)
        }else{
          counter+=1
        }
     }
    listOfRow.toIterator
    })

    rowRDD .foreach(println)
}

case class StartEnd(startingPosition: Int, endingPosition: Int) extends Serializable

def getRow(x: String, inst: List[StartEnd]): Row = {
    val columnArray = new Array[String](inst.size)
    for (f <- 0 to inst.size - 1) {
      columnArray(f) = x.substring(inst(f).startingPosition, inst(f).endingPosition)
    }
    Row.fromSeq(columnArray)
}

// Примечание: для вашей ссылки, indexList я создал с помощью класса case StartEnd, который выглядит как показано ниже после создания

[List(StartEnd(0,4), StartEnd(4,10), StartEnd(7,12), StartEnd(10,14))]

Эта программа отлично работает на моем локальном компьютере.Но когда я включаю кластер (AWS), он выдает исключение, как показано ниже.

17:24:10.947 [Driver] ERROR bms.edl.dt.transform.FileConversion.convertFixedWidthToCsv - Exception [Task not serializable] 
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340) ~[glue-assembly.jar:?]
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330) ~[glue-assembly.jar:?]
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156) ~[glue-assembly.jar:?]
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294) ~[glue-assembly.jar:?]
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:794) ~[glue-assembly.jar:?]
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793) ~[glue-assembly.jar:?]
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) ~[glue-assembly.jar:?]
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) ~[glue-assembly.jar:?]
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) ~[glue-assembly.jar:?]
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:793) ~[glue-assembly.jar:?]


    Caused by: java.io.NotSerializableException: sun.nio.cs.UTF_8
Serialization stack:
- object not serializable (class: sun.nio.cs.UTF_8, value: UTF-8)
- field (class: org.apache.logging.log4j.core.layout.AbstractStringLayout, name: charset, type: class java.nio.charset.Charset)
- object (class org.apache.logging.log4j.core.layout.PatternLayout, %d{HH:mm:ss.SSS} [%t] %-5level %logger{5}.%M - %msg%n)
- field (class: org.apache.logging.log4j.core.appender.AbstractAppender, name: layout, type: interface org.apache.logging.log4j.core.Layout)
- object (class org.apache.logging.log4j.core.appender.ConsoleAppender, STDOUT)
- writeObject data (class: java.util.concurrent.ConcurrentHashMap)
- object (class java.util.concurrent.ConcurrentHashMap, {STDOUT=STDOUT})
- field (class: org.apache.logging.log4j.core.config.AbstractConfiguration, name: appenders, type: interface java.util.concurrent.ConcurrentMap)

- object (class org.apache.logging.log4j.core.config.xml.XmlConfiguration, XmlConfiguration[location=jar:file:/mnt/yarn/usercache/root/filecache/163/edl-dt-1.9-SNAPSHOT.jar!/log4j2.xml])
- field (class: org.apache.logging.log4j.core.LoggerContext, name: configuration, type: interface org.apache.logging.log4j.core.config.Configuration)
- object (class org.apache.logging.log4j.core.LoggerContext, org.apache.logging.log4j.core.LoggerContext@418bb61f)

- field (class: org.apache.logging.log4j.core.Logger, name: context, type: class org.apache.logging.log4j.core.LoggerContext)
- object (class org.apache.logging.log4j.core.Logger, com.bms.edl.dt.transform.FileConversion:TRACE in 681842940)
- field (class: com.bms.edl.dt.transform.FileConversion, name: LOG, type: interface org.apache.logging.log4j.Logger)
- object (class com.bms.edl.dt.transform.FileConversion, com.bms.edl.dt.transform.FileConversion@984ddbb)
- field (class: com.bms.edl.dt.transform.FileConversion$$anonfun$7, name: $outer, type: class com.bms.edl.dt.transform.FileConversion)
- object (class com.bms.edl.dt.transform.FileConversion$$anonfun$7, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) ~[glue-assembly.jar:?]
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) ~[glue-assembly.jar:?]
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) ~[glue-assembly.jar:?]

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:337) ~[glue-assembly.jar:?]
... 71 more
17:24:10.954 [Driver] TRACE bms.edl.dt.transform.FileConversion.convertFixedWidthToCsv - Exit
17:24:10.954 [Driver] INFO bms.edl.dt.transform.FileConversion.apply - counterMap>>>>>>>>>Map(ResultantDF -> [], ExceptionString ->

Exception occurred while applying the FileConversion transformation and the exception Message is :Task not serializable)
17:24:11.692 [Driver] INFO bms.edl.dt.transform.FileConversion.apply - df count >>>>>>>>>0

17:24:11.692 [Driver] INFO bms.edl.dt.transform.FileConversion.apply - THERE WAS AN EXCEPTION FIX WIDTHING
17:24:11.692 [Driver] INFO bms.edl.dt.transform.FileConversion.dataTransform - THERE WAS AN EXCEPTION -- sb is not empty
17:24:11.693 [Driver] TRACE bms.edl.dt.transform.FileConversion.dataTransform - Exit
17:24:11.693 [Driver] INFO bms.edl.dt.transform.FileConversion.dataTransform - result>>>>>>>>Map(ResultantDF -> [], ExceptionString -> 
Exception occurred while applying the FileConversion transformation and the exception Message is :Task not serializable
Exception occurred while applying the FileConversion transformation and the exception Message is :Task not serializable)
17:24:11.693 [Driver] TRACE edl.core.services.reflection.ReflectionInvoker$.invokeDTMethod - Exit

Я не могу понять, что здесь не так, а что не сериализуемо, почему оно вызывает исключение.

Любая помощь приветствуется.Заранее спасибо!

1 Ответ

2 голосов
/ 19 мая 2019

Вы вызываете метод getRow внутри преобразования Spark mapPartition.Это заставляет искру передать экземпляр вашего основного класса рабочим.Основной класс содержит LOG в качестве поля.Кажется, что этот журнал не подходит для сериализации.Вы можете

a) переместить getRow из LOG в другой object (общий способ решения таких проблем)

b) сделать LOG a lazy val

в) использовать другую библиотеку журналов

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...