Задача не сериализуется в scala, org. apache .spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner. scala: 403) - PullRequest
0 голосов
/ 19 февраля 2020

Код работает нормально, когда я пишу это:

def getData(filePath: String, recordSeperator:String , columnSeperator:String): DataFrame = {
    val data = spark.read
      .text(filePath)
      .toDF("val")
      .withColumn("id", monotonically_increasing_id())

    val count = data.count()

    val header = data.where("id==1").collect().map(s => s.getString(0)).apply(0)
    val columns = header
      .replace("H|*|", "")
      .replace(recordSeperator, "")
      .split(columnSeperator)



    import  spark.implicits._

    val correctData = data.where('id > 1 && 'id < count - 1)



    val splitIntoCols = correctData.rdd.map(s=>{
        val arr = s.getString(0).replace("|##|", "").split("\\|\\*\\|")

        //val arr = s.getString(0)
        //val arr1 = arr.replace(recordSeperator, "").split(columnSeperator)
        //println(arr1)

        RowFactory.create(arr:_*)
    })
    val struct = StructType(columns.map(s=>StructField(s, StringType, true)))
    val finalDF = spark.createDataFrame(splitIntoCols,struct)


    return finalDF

    }

Но выдает ошибку, когда я пишу это: -

def getData(filePath: String, recordSeperator:String , columnSeperator:String): DataFrame = {



    val data = spark.read
      .text(filePath)
      .toDF("val")
      .withColumn("id", monotonically_increasing_id())

    val count = data.count()

    val header = data.where("id==1").collect().map(s => s.getString(0)).apply(0)
    val columns = header
      .replace("H|*|", "")
      .replace(recordSeperator, "")
      .split(columnSeperator)



    import  spark.implicits._

    val correctData = data.where('id > 1 && 'id < count - 1)



    val splitIntoCols = correctData.rdd.map(s=>{
        //val arr = s.getString(0).replace("|##|", "").split("\\|\\*\\|")

        val arr = s.getString(0)
        val arr1 = arr.replace(recordSeperator, "").split(columnSeperator)


        RowFactory.create(arr1:_*)
    })
    val struct = StructType(columns.map(s=>StructField(s, StringType, true)))
    val finalDF = spark.createDataFrame(splitIntoCols,struct)


    return finalDF
    }

Здесь recordSeperator - "| ## |" и columnSeperator - "\ | \ * \ |" ОШИБКА: "stackTrace": "org. apache .spark.SparkException: задача не сериализуема, в org. apache .spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner. scala: 403) org. apache .spark.util.ClosureCleaner $ .org $ апач $ искра $ Util $ ClosureCleaner $$ чистого (ClosureCleaner scala:. 393)

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...