Spark Kafka Streaming Consumer - задержка в совершении смещения для Kafka - PullRequest
0 голосов
/ 07 февраля 2019

Я читаю потоковые сообщения, используя KafkaUtils.createDirectStream и использую commitAsync для фиксации смещения после их обработки (т.е. после их загрузки в таблицу Hive).Я вижу, что между загрузкой улья и фиксацией смещения существует определенный временной интервал, примерно от половины минуты до одной минуты.Другое дело, что метод фиксации запускается с определенной частотой.Как упомянуто выше, он срабатывает каждые полминуты и не зависит от времени / частоты поступления данных.

Таким образом, в этом случае, если мое приложение будет убито за время досмещение сохраняется, это происходит, чтобы прочитать этот конкретный пакет данных снова.Могу ли я контролировать время / частоту, когда смещение фиксируется (т. Е. Срабатывает commitAsyn), чтобы уменьшить задержку?Или вы думаете, что commitSync поможет в этом случае?И если да, могу ли я использовать commitSync с KafkaUtils.createDirectStream для чтения сообщений?Можете ли вы привести пример, как реализовать API commitSync с KafkaUtils.createDirectStream?

public void method1(SparkConf conf,String app) 
spark = SparkSession.builder().appName(conf.get("")).enableHiveSupport().getOrCreate();
final JavaStreamingContext javaStreamContext = new JavaStreamingContext(context,
            new Duration(<spark duration>));
JavaInputDStream<ConsumerRecord<String, String>> recs= KafkaUtils.createDirectStream(javaStreamContext,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, String> Subscribe(<topicnames>, <kafka Params>));
recs.foreachRDD(rdd -> {
        OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
        JavaRDD<String> javardd = rdd.map(tuple2 -> tuple2.value() );
        if(!javardd.isEmpty()) {
           JavaRDD<String> cache = javardd.cache();
            <MethodToLoadIntoHive>
          ((CanCommitOffsets)recs.inputDStream()).commitAsync(offsetRanges);
        }
    });
javaStreamContext.start();
javaStreamContext.awaitTermination();

}

...