Я попытался создать восстанавливаемое искровое потоковое задание с некоторыми аргументами, полученными из базы данных.Но тогда у меня возникла проблема: она всегда выдает ошибку сериализации, когда я пытаюсь перезапустить задание с контрольной точки.
18/10/18 09:54:33 ОШИБКА Исполнитель: Исключение в задании 1.0на этапе 56.0 (TID 132) java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration не может быть приведен к scala.collection.MapLike в com.ptnj.streaming.alertJob.InputDataParser $ .kafka_stream_handle (InputDataParser.) в com.ptnj.streaming.alertJob.InstanceAlertJob $$ anonfun $ 1.apply (InstanceAlertJob.scala: 38) в com.ptnj.streaming.alertJob.InstanceAlertJob $$ anonfun $ 1.apply (InstanceAlertJob.scala.collection.Iterator $$ anon $ 11.next (Iterator.scala: 410) в scala.collection.Iterator $$ anon $ 13.hasNext (Iterator.scala: 463) в scala.collection.Iterator $$ anon $ 11.hasNext (Iterator.scala: 409) в scala.collection.Iterator $$ anon $ 13.hasNext (Iterator.scala: 462) в scala.collection.Iterator $$ anon $ 12.hasNext (Iterator.scala: 440) в scala.collection.Iterator $$Анон $ 11.hasNext (Iterator.scala: 409) в org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write (BypassMergeSortShuffleWriter.java:126) в org.apache.spark.scheduler.ShuffleMapTask.runTask.Sachespark.Sche.ShuffleMapTask.runTask (ShuffleMapTask.scala: 53) в org.apache.spark.scheduler.Task.run (Task.scala: 99) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 282) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) в java.lang.Threadjrun (748)
Я последовал совету Максима G в этом существующем вопросе , и это, похоже, помогает.
Но теперь есть еще одно исключение.И из-за этой проблемы я должен создавать широковещательные переменные при преобразовании потока, например
val kafka_data_streaming = stream.map(x => DstreamHandle.kafka_stream_handle(url, x.value(), sc))
Так что мне нужно будет поместить sparkcontext в качестве параметра в функцию преобразования, тогда это произойдет:
Исключение в потоке "main" org.apache.spark.SparkException: задача не сериализуется в org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 298) в org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean (ClosureCleaner.scala: 288) в org.apache.spark.util.ClosureCleaner $ .clean (ClosureCleaner.scala: 108) в org.apache.spark.SparkContext.clean (SparkContext.scala: 2094) в org.apache.spark.streaming.dstream.DStream $$ anonfun $ map $ 1.apply (DStream.scala: 546) в org.apache.spark.streaming.dstream.DStream$$ anonfun $ map $ 1.apply (DStream.scala: 546) в org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 151) в org.apache.spark.rdd.RDDOperationScope $ .withScope (RDOperation).scala: 112) в org.apache.spark.SparkContext.withScope (SparkContext.scala: 701)
в org.apache.spark.streaming.StreamingContext.withScope (StreamingContext.scala: 264) в org.apache.spark.streaming.dstream.DStream.map(DStream.scala: 545)
в com.ptnj.streaming.alertJob.InstanceAlertJob $ .streaming_main (InstanceAlertJob.scala: 38) в com.ptnj.streaming.AlarmMain $ .create_ssc (AlarmMain.scala: 36) в com.ptnj.streaming.AlarmMain $ .main (AlarmMain.scala: 14) at com.ptnj.streaming.AlarmMain.main (AlarmMain.scala) Причина: java.io.NotSerializableException: org.apache.spark.SparkContext Стек сериализации:- объект не сериализуем (класс: org.apache.spark.SparkContext, значение: org.apache.spark.SparkContext@5fb7183b) - поле (класс: com.ptnj.streaming.alertJob.InstanceAlertJob $$ anonfun $ 1, имя: sc $ 1, тип: class org.apache.spark.SparkContext) - объект (класс com.ptnj.streaming.alertJob.InstanceAlertJob $$ anonfun $ 1,) вorg.apache.spark.serializer.SerializationDebugger $ .improveException (SerializationDebugger.scala: 40) в org.apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala: 46) в org.apache.spark.serializer.JavaS(JavaSerializer.scala: 100) at org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 295) ... еще 14
И я никогда раньше не сталкивался с такой ситуацией.Каждый пример показывает, что широковещательные переменные будут создаваться в выходной операции, а не в функции преобразования, так возможно ли это?