GettingTask не сериализуемое исключение при выполнении Spark Job - PullRequest
0 голосов
/ 10 мая 2018

Я пытаюсь получить application_number запись из таблицы hive и собрать ее в виде списка. и из этого списка я перебираю список и для каждого application_number я пытаюсь вызвать команду curl.

Вот мой пример кода:

object th extends Serializable
{
  def main(args: Array[String]): Unit =
  {


    val conf = new SparkConf().setAppName("th").setMaster("local")
    conf.set("spark.debug.maxToStringFields", "10000000")
    val context = new SparkContext(conf)
    val sqlCotext = new SQLContext(context)
    val hiveContext = new HiveContext(context)
    import hiveContext.implicits._   
    val list = hiveContext.sql("select application_number from tableA").collect().take(100)     
    val l1=context.parallelize(list)   
    val stu1 =StructType(
      StructField("application_number", LongType, true) ::
      StructField("event_code", StringType, true) ::
      StructField("event_description", StringType, true) ::
      StructField("event_recorded_date", StringType, true) :: Nil)
    var initialDF1 = sqlCotext.createDataFrame(context.emptyRDD[Row], stu1)
    l1.repartition(10).foreachPartition(f=>{f.foreach(f=>
      {
        val schema=StructType(List(
        StructField("queryResults",StructType(
        List(StructField("searchResponse",StructType(
          List(StructField("response",StructType(
            List(StructField("docs",ArrayType(StructType(
              List(               
                     StructField("transactions",ArrayType(StructType(
                        List
                        (
                            StructField("code", StringType, nullable = true),
                            StructField("description", StringType, nullable = true),
                            StructField("recordDate", StringType, nullable = true)
                        )
                        )))                    
                  )
            ))))
          )))
        )))
      ))
        ))

      val z = f.toString().replace("[","").replace("]","").replace(" ","").replace("(","").replace(")","")
      if(z!= null)
      {
      val cmd = Seq("curl", "-X", "POST", "--insecure", "--header", "Content-Type: application/json", "--header", "Accept: application/json", "-d", "{\"searchText\":\""+z+"\",\"qf\":\"applId\"}", "https://ped.uspto.gov/api/queries")      //cmd.!
      val r = cmd.!!
      val r1 = r.toString()
      val rdd = context.parallelize(Seq(r1))
      val dff = sqlCotext.read.schema(schema).json(rdd.toDS) 
      val dfContent = dff.select(explode(dff("queryResults.searchResponse.response.docs.transactions"))).toDF("transaction")
      val a1 = dfContent.select("transaction.code").collect()
      val a2 = dfContent.select("transaction.description").collect()
      val a3 = dfContent.select("transaction.recordDate").collect()    
      for (mmm1 <- a1; mm2 <- a2; mm3 <- a3) 
      {
          val ress1 = mmm1.toString().replace("[", " ").replace("]", " ").replace("WrappedArray(","").replace(")","")
          val res2 = mm2.toString().replace("[", " ").replace("]", " ").replace("WrappedArray(","").replace(")","")
          val res3 = mm3.toString().replace("[", " ").replace("]", " ").replace("WrappedArray(","").replace(")","")          
          initialDF1 = initialDF1.union(Seq((z, ress1, res2, res3)).toDF("application_number", "event_code", "event_description", "event_recorded_date"))
       }
      }        

      })})
     initialDF1.registerTempTable("curlTH")
     hiveContext.sql("insert into table default.ipg_tableB select application_number,event_code,event_description,event_recorded_date from curlTH")
    }
}

Я получаю задачу, а не сериализованное исключение.

Вот мой след ошибки:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:923)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923)
    at newipg170103.th$.main(th.scala:58)
    at newipg170103.th.main(th.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@1e592ef2)
    - field (class: newipg170103.th$$anonfun$main$1, name: context$1, type: class org.apache.spark.SparkContext)
    - object (class newipg170103.th$$anonfun$main$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 20 more

1 Ответ

0 голосов
/ 10 мая 2018

В Apache Spark запрещено использовать SQLContext, SparkContext или SparkSession в действии или преобразовании (map, foreach, mapPartitions, foreachPartition и т. Д.).

Следовательно

l1.repartition(10).foreachPartition(f=>{f.foreach(f=>
   ...
   val rdd = context.parallelize(Seq(r1))
   val dff = sqlCotext.read.schema(schema).json(rdd.toDS) 
)})

недействительный код Spark.

...