Пока тема удаляется и создается, процесс должен продолжаться - PullRequest
0 голосов
/ 18 декабря 2018

Мой потребительский процесс умер при удалении темы.Но программа просто заблокирована и выводит логи

[2018-12-18 12:15:58] ArrayBuffer(org.apache.spark.SparkException: Error getting partition metadata for 'dp.dtg'. Does the topic exist?)
[2018-12-18 12:15:58] Error generating jobs for time 1545102958000 ms
java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative
        at scala.Predef$.require(Predef.scala:224)
        at org.apache.spark.streaming.scheduler.StreamInputInfo.<init>(InputInfoTracker.scala:38)
        at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
        at scala.Option.orElse(Option.scala:289)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
[2018-12-18 12:15:59] Error generating jobs for time 1545102959000 ms
java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative
        at scala.Predef$.require(Predef.scala:224)
        at org.apache.spark.streaming.scheduler.StreamInputInfo.<init>(InputInfoTracker.scala:38)
        at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
        at scala.Option.orElse(Option.scala:289)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

.,,(Же журнал повторяется.)

Как я могу обработать потребителя без перезапуска моей программы?

Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapIP);
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "dTest-analysis");
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, TestPayloadDeSerializer.class);

SparkConf conf = new SparkConf().setMaster(sparkURL).setAppName("test");
JavaStreamingContext context = new JavaStreamingContext(conf, Durations.seconds(1));

Collection<String> topics = Arrays.asList(ShareInfo.getKafkaTopic());
Set<String> set = new TreeSet<>(topics);

Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", bootstrapIP);
kafkaParams.put("metadata.broker.list", bootstrapIP);
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "com.test.spark.kafka.TestPayloadDeSerializer");

JavaPairInputDStream<String, TestRequestBody> stream =
        KafkaUtils.createDirectStream(
                context,
                String.class,
                TestRequestBody.class,
                StringDecoder.class,
                TestPayloadDecoder.class,
                kafkaParams,
                set
                );

stream.foreachRDD(new VoidFunction<JavaPairRDD<String, TestRequestBody>>() {
    @Override 
    public void call(JavaPairRDD<String, TestRequestBody> arg0){ 
    //Do something
    }
}

context.start();
context.awaitTermination();

Как я могу поймать "Ошибка получения метаданных раздела"?

Как я могу обработать потребителя без перезапуска моей программы?

Но если программа должна перезапуститься,как я могу проверить состояние потребителя?

Спасибо.

...