сохранить смещение Кафки на драйвере после завершения обработчика обработчика RDD в потоковом искре - PullRequest
0 голосов
/ 18 февраля 2019

У меня есть приложение Spark Streaming, которое читает из раздела Kafka. Я управляю смещениями вручную для восстановления.когда я рушу драйвер, он автоматически перезапускается, и последнее смещение, записанное в теме Кафки, выглядит хорошо.но я подозреваю, что он записывает смещение до того, как исполнитель завершит работу, поэтому я потерял последний кусок данных перед тем, как завершить его.

kafkaSparkInputDStream.foreachRDD(rdd -> {
         OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
         Object results = yourCalculation(rdd);
         rdd.for (OffsetRange offsetRange : offsetRanges) {
             if (offsetRange.count() >0) {
                 System.out.println("offsetRange. from [" + offsetRange.fromOffset() +"] until[" + offsetRange.untilOffset()+ "]. count [" + offsetRange.count()+ "] partition [" +offsetRange.partition()+"]" );
             }
         }
         ((CanCommitOffsets) kafkaSparkInputDStream.inputDStream()).commitAsync(offsetRanges);
...