Закрытие Spark Streaming Context после первого пакета (попытка получить смещения kafka) - PullRequest
0 голосов
/ 11 декабря 2018

Я пытаюсь получить смещения Кафки для моей работы Spark Batch.После получения смещений я хотел бы закрыть контекст потока.

Я попытался добавить потоковый прослушиватель в контекст потока и реализовать метод onBatchCompleted, чтобы закрыть поток после завершения задания, но я получаю исключение «Не удается остановить StreamingContext в потоке шины слушателя» .

Есть ли решение для этого?Я пытаюсь получить смещения для вызова KafkaUtils.createRDD (sparkContext, kafkaProperties, OffsetRange [], LocationStrateg)

private OffsetRange[] getOffsets(SparkConf sparkConf) throws InterruptedException {
    final AtomicReference<OffsetRange[]> atomicReference = new AtomicReference<>();

    JavaStreamingContext sc = new JavaStreamingContext(sparkConf, Duration.apply(50));
    JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(sc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(Arrays.asList("test"), getKafkaParam()));
    stream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, String>>>) rdd -> {
                atomicReference.set(((HasOffsetRanges) rdd.rdd()).offsetRanges());
                // sc.stop(false); //this would throw exception saying consumer is already closed
            }
    );
    sc.addStreamingListener(new TopicListener(sc)); //Throws exception saying "Cannot stop StreamingContext within listener bus thread."
    sc.start();
    sc.awaitTermination();
    return atomicReference.get();
}



public class TopicListener implements StreamingListener {
private JavaStreamingContext sc;

public TopicListener(JavaStreamingContext sc){
    this.sc = sc;
}
@Override
public void onBatchCompleted(StreamingListenerBatchCompleted streamingListenerBatchCompleted) {
    sc.stop(false);
}

Большое спасибо stackoverflow-ers :) Я пытался найти возможные решения, но пока не нашелбыл успешным

Редактировать : Я использовал KafkaConsumer для получения информации о разделах.Получив информацию о разделах, я создаю список pojos TopicPartition и вызываю методы position и endOffsets, чтобы получить текущую позицию моего groupId и конечную позицию соответственно.

final List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor("theTopicName");
final List<TopicPartition> topicPartitions = new ArrayList<>();
partitionInfos.forEach(partitionInfo -> topicPartitions.add(new TopicPartition("theTopicName", partitionInfo.partition())));
final List<OffsetRange> offsetRanges = new ArrayList<>();
kafkaConsumer.assign(topicPartitions);
topicPartitions.foreach(topicPartition -> {
    long fromOffset = kafkaConsumer.position(topicPartition);
    kafkaConsumer.seekToEnd(Collections.singleton(topicPartition));
    long untilOffset = kafkaConsumer.position(topicPartition);
    offsetRanges.add(new OffsetRange(topicPartition.topic(), topicPartition.partition(), fromOffset, untilOffset));
});
return offsetRanges.toArray(new OffsetRange[offsetRanges.size()]);

1 Ответ

0 голосов
/ 11 декабря 2018

Если вы хотите контролировать поток, вы можете использовать опрос вместо потокового API.Таким образом, вы сможете четко остановить опрос, как только ваши цели будут достигнуты.

Также проверьте это ...

https://github.com/dibbhatt/kafka-spark-consumer

...