как исправить задачу не сериализуемое исключение в sparkstreaming - PullRequest
1 голос
/ 23 апреля 2019

Я хочу обобщить интернет-логи, используя sparkstreaming. У меня есть трансформация данные журнала в карту. Ошибка возникает при обработке вычислений.

Установить конфигурацию сериализации искры в avro. Но это не работает.

Следующий код:

...
val sc = new SparkContext(conf)
...
val lines = kafkaStream.map(_._2)
  .map { _.split("\\|") }
  .map { arr =>
    Map(
      ...
    )
  }

lines.print()  // this works

lines.map { clearMap =>  // the line exception point to  
    ...
    val filter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("^\\d+_" + uvid + "_.*$"))

    val r = HBaseUtils.queryFromHBase(sc, "flux", zerotime.getBytes, nowtime.getBytes,filter)
    val uv = if (r.count() == 0) 1 else 0

    val sscount = clearMap("sscount")
    val vv = if (sscount == "0") 1 else 0

    val cip = clearMap("cip")
    val filter2 = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("^\\d+_\\d+_\\d+_" + cip + "_.*$"))

    val r2 = HBaseUtils.queryFromHBase(sc, "flux", zerotime.getBytes, nowtime.getBytes, filter2)
    val newip = if (r2.count() == 0) 1 else 0

    val filter3 = new RowFilter(CompareOp.EQUAL,new RegexStringComparator("^\\d+_"+uvid+"_.*$"))
    val r3 = HBaseUtils.queryFromHBase(sc, "flux", null, nowtime.getBytes, filter3)
    val newcust = if (r3.count() == 0) 1 else 0

    (nowtime, pv, uv, vv, newip, newcust)
  }
...

Ниже приведено сообщение об исключении:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2056)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:546)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:546)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:679)
    at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:264)
    at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:545)
    at cn.tedu.flux.fluxdriver$.main(fluxdriver.scala:73)
    at cn.tedu.flux.fluxdriver.main(fluxdriver.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@3fc08eec)
    - field (class: cn.tedu.flux.fluxdriver$$anonfun$main$2, name: sc$1, type: class org.apache.spark.SparkContext)
    - object (class cn.tedu.flux.fluxdriver$$anonfun$main$2, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 12 more

1 Ответ

0 голосов
/ 24 апреля 2019

Я решил эту проблему. SparkContext не может быть сериализуемым в качестве аргумента, если он определен в функции.Поэтому я пытаюсь определить его следующим образом:

объектный драйвер {

var sc:SparkContext=null
def main(arg:Array[String]):Unit = {
    sc = new SparkContext();
....

}}

И это работает!

раньше, вот так:

Драйвер объекта {

def main(arg:Array[String]):Unit = {

vla sc = new SparkContext;

......

}}

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...