delete hbase на spark rdd , Задача не сериализуема - PullRequest
0 голосов
/ 07 мая 2018

я хочу удалить данные из hbase с помощью rdd, вот мои коды

def delGraphIdVertexInfo(vertexInfoRDD: RDD[(BqsVertex, String)]): Unit = {
    vertexInfoRDD.foreachPartition(partition => {
        val hc = HBaseConfiguration.create()
        val cn = ConnectionFactory.createConnection(hc)
        val userTable = TableName.valueOf(U_SPARK_GRAPH_GRAPHID_VERTEX_INFO_TABLE)
        val table = cn.getTable(userTable)

        partition.foreach(x => {
            val key = x._2 + CommonConstant.GRAPHID_EDGE_SPLIT + x._1.toString
            val d = new Delete(Bytes.toBytes(key))
            table.delete(d)
        })
        table.close()
        cn.close()
    })
}

Я ввел HBaseConfiguration и createConnection в foreachPartition, но все равно произошла ошибка.

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:923)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923)
at com.bqs.bigdata.calculation.ComplexNetWorkSparkCommon.delGraphIdVertexInfo(ComplexNetWorkSparkCommon.scala:176)
at com.bqs.bigdata.calculation.ComplexNetWorkSparkInput.mergeGraph(ComplexNetWorkSparkInput.scala:166)
at com.bqs.bigdata.calculation.ComplexNetWorkSparkInput.handleEdgeInfo(ComplexNetWorkSparkInput.scala:298)
at com.bqs.bigdata.calculation.ComplexNetWorkSparkInput.main(ComplexNetWorkSparkInput.scala:38)
at com.bqs.bigdata.MobileCertNoComplexNetWork.main(MobileCertNoComplexNetWork.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration
Serialization stack:
    - object not serializable (class: org.apache.hadoop.conf.Configuration, value: Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, hbase-default.xml, hbase-site.xml)
    - field (class: com.bqs.bigdata.calculation.ComplexNetWorkSparkCommon, name: HBaseConf, type: class org.apache.hadoop.conf.Configuration)
    - object (class com.bqs.bigdata.MobileCertNoComplexNetWork$, com.bqs.bigdata.MobileCertNoComplexNetWork$@68a1bb31)
    - field (class: com.bqs.bigdata.calculation.ComplexNetWorkSparkCommon$$anonfun$delGraphIdVertexInfo$1, name: $outer, type: class com.bqs.bigdata.calculation.ComplexNetWorkSparkCommon)
    - object (class com.bqs.bigdata.calculation.ComplexNetWorkSparkCommon$$anonfun$delGraphIdVertexInfo$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)

что мне делать?

...