java.nio.BufferUnderflowException при попытке прочитать сообщения от Kafka с заданием Spark Streaming - PullRequest
0 голосов
/ 11 октября 2019

Моя проблема в том, что я не могу напечатать данные, собранные с Kafka, потому что, когда я пытаюсь обработать де DStream, полученный Kafka, я получаю исключение java.nio.BufferUnderflowException. Я не могу понять, если это проблема конфигурации или способ, которым я пытаюсь обработать rdds.

Я пытался изменить конфигурации и способ обработки rdds, но я не мог решить это.

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
object GeoElaborator {
  def main(args: Array[String]): Unit = {

    val ssc = new StreamingContext("local[*]","GeoElaborator",Seconds(1))
    val kafkaParams =Map[String,String](        "bootstrap.servers" -> "h1:9092,h2:9092,h03:9092",
                                                "group.id" -> "GeoElaborator")
    val topic = List("topic1").toSet
    val record = KafkaUtils.createDirectStream[String, String,StringDecoder,StringDecoder](ssc,kafkaParams,topic)
     record.foreachRDD(x=>{
      x.foreach(y=>println(y))
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

И вот что я получаю, выполняя "spark-submit --master yarn --deploy-mode client --class" GeoElaborator "GeoElaborator-1.0-SNAPSHOT.jar"

19/10/11 13:55:06 ERROR scheduler.StreamingListenerBus: StreamingListenerBus has already stopped! Dropping event StreamingListenerBatchCompleted(BatchInfo(1570794906000 ms,Map(0 -> StreamInputInfo(0,1,Map(offsets -> List(OffsetRange(topic: 'meteorites', partition: 2, range: [10993 -> 10994]), OffsetRange(topic: 'meteorites', partition: 1, range: [10996 -> 10996]), OffsetRange(topic: 'meteorites', partition: 0, range: [10997 -> 10997])), Description -> topic: meteorites partition: 2    offsets: 10993 to 10994))),1570794906058,Some(1570794906217),Some(1570794906361),Map(0 -> OutputOperationInfo(1570794906000 ms,0,print at GeoElaborator.scala:18,org.apache.spark.streaming.dstream.DStream.print(DStream.scala:757)
GeoElaborator$.main(GeoElaborator.scala:18)
GeoElaborator.main(GeoElaborator.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:730)
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala),Some(1570794906209),Some(1570794906359),Some(org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 5, localhost, executor driver): java.nio.BufferUnderflowException
        at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145)
        at java.nio.ByteBuffer.get(ByteBuffer.java:694)
        at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:40)
        at kafka.api.TopicData$.readFrom(FetchResponse.scala:96)
        at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
        at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.immutable.Range.foreach(Range.scala:141)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
        at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
        at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
        at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1644)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1603)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1592)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1862)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1875)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1888)
        at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1328)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1302)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$5$1.apply(DStream.scala:768)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$5$1.apply(DStream.scala:767)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.BufferUnderflowException
        at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145)
        at java.nio.ByteBuffer.get(ByteBuffer.java:694)
        at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:40)
        at kafka.api.TopicData$.readFrom(FetchResponse.scala:96)
        at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
        at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.immutable.Range.foreach(Range.scala:141)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
        at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
        at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
        at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
        ... 3 more
)))))


...