У меня есть приложение Spark Streaming (Spark 2.4.5, kafka 0.8.2), которое я в настоящее время тестирую на производительность с сообщениями разных размеров.
def create_context():
logger.info("Creating new context - not from checkpoint")
print("Creating new context")
ssc = StreamingContext(sc, 5)
opts = {"metadata.broker.list": globals()['kafka_hosts'],
"auto.offset.reset": "largest",
"group.id": "test_v1"}
kvs = KafkaUtils.createDirectStream(ssc, [topic_listen], opts)
kvs.checkpoint(120)
print("Processing started")
lines = kvs.map(lambda row: row[1])
lines.foreachRDD(my_streaming_function)
ssc.checkpoint(checkpoint)
return ssc
ssc = StreamingContext.getOrCreate(checkpoint, lambda: create_context())
ssc.start()
Приложение запускается, обработка начинается, но через некоторое время точка (не в состоянии догнать сообщения) Я получаю следующую (ожидаемую) ошибку в определенном пакете:
Caused by: kafka.common.OffsetOutOfRangeException
at sun.reflect.GeneratedConstructorAccessor33.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:188)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:198)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Проблема в том, что приложение никогда не восстанавливается из этого состояния. После «догоняющего» всех пакетов (не ожидающих активных пакетов), когда я публикую sh любые новые сообщения, ошибка по-прежнему сохраняется, даже если сообщение было создано за несколько секунд до его обработки.
I Я бы подумал, что параметр «auto.offset.reset = Large» сбрасывает смещения автоматически, но я думаю, что это применимо только для запуска приложения. Какие варианты у меня есть для автоматического c восстановления?