Искры потокового Кафки бросая универсальный `kafka.common.UnknownException` - PullRequest
0 голосов
/ 29 мая 2019

Наше искровое задание - это очень простое потоковое приложение Kafka, которое потребляет пакет событий, выполняет некоторое агрегирование и запись в хранилище данных Cassandra.

  • Мы запускаем задание на Spark v1.6.1 в кластере автономного режимаmode.
  • События Kafka были использованы с использованием spark-streaming-kafka_2.11:1.6.1 для этих заданий.Это тянет в kafka_2.11: 0.8.2.1 транзитивно.

Зависимости, возникающие при использовании spark-streaming-kafka_2.11: 1.6.1

+- org.apache.spark:spark-streaming-kafka_2.11:jar:1.6.1:compile
 |  \- org.apache.kafka:kafka_2.11:jar:0.8.2.1:compile
 |     +- com.yammer.metrics:metrics-core:jar:2.2.0:compile
 |     +- com.101tec:zkclient:jar:0.3:compile
 |     \- org.apache.kafka:kafka-clients:jar:0.8.2.1:compile
org.apache.spark.SparkException: Job aborted due to stage failure: Task 41 in stage 10982.0 failed 4 times, most recent failure: Lost task 41.3 in stage 10982.0 (TID 105719, 172.23.11.121): kafka.common.UnknownException
    at sun.reflect.GeneratedConstructorAccessor27.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.lang.Class.newInstance(Class.java:442)
    at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:413)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:413)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1922)
    at com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1.apply(DStreamFunctions.scala:54)
    at com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1.apply(DStreamFunctions.scala:54)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: kafka.common.UnknownException
    at sun.reflect.GeneratedConstructorAccessor27.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.lang.Class.newInstance(Class.java:442)
    at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:413)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:413)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    ... 3 more

Изначально у меня сложилось впечатление, что этоОбновление версии брокера kafka нам пришлось сделать в среде.Но этот же код работает нормально в одной среде с такой же конфигурацией, но не работает в другой.Так что не совсем уверен, как отладить эту ситуацию.Был бы признателен, если кто-нибудь может помочь мне выяснить, что происходит с этой трассировкой стека.Кроме того, на Cassandra нет журналов ошибок / предупреждений.

Итак, я отчасти заблудился, что делать дальше.Дайте мне знать, если вам нужна дополнительная информация о том же.Рад предоставить более подробную информацию.

...