kafka.common.OffsetOutOfRangeException - PullRequest
       21

kafka.common.OffsetOutOfRangeException

0 голосов
/ 16 апреля 2019

Получаю приведенную ниже ошибку при потоковой передаче искры.Это работает отлично в течение нескольких дней, но после 4-5 дней появляется ошибка ниже.Может кто-нибудь, пожалуйста, помогите мне в этом.

kafka.common.OffsetOutOfRangeException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at java.lang.Class.newInstance(Class.java:374)
    at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:102)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:229)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Я проверил свойства kafka и значение по умолчанию для log.retention.hours составляет 7 дней.Также выполнила приведенную ниже команду.

kafka-topics.sh --zookeeper localhost:2181 --describe --topics-with-overrides

Получил приведенный ниже вывод для темы, из которой читается.

Topic:consumer_events        PartitionCount:25       ReplicationFactor:3     Configs:retention.ms=43200000,min.insync.replicas=2,cleanup.policy=delete,compression.type=snappy,retention.bytes=644245094400

Может кто-нибудь, пожалуйста, помогите мне в этом.

Спасибо

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...