Spark Streaming - автоматический сброс смещения Kafka (Spark 2.4.5, kafka 0.8.2) - PullRequest
0 голосов
/ 13 апреля 2020

У меня есть приложение Spark Streaming (Spark 2.4.5, kafka 0.8.2), которое я в настоящее время тестирую на производительность с сообщениями разных размеров.

def create_context():
    logger.info("Creating new context - not from checkpoint")
    print("Creating new context")
    ssc = StreamingContext(sc, 5)
    opts = {"metadata.broker.list": globals()['kafka_hosts'],
            "auto.offset.reset": "largest",
            "group.id": "test_v1"}
    kvs = KafkaUtils.createDirectStream(ssc, [topic_listen], opts)
    kvs.checkpoint(120)
    print("Processing started")
    lines = kvs.map(lambda row: row[1])
    lines.foreachRDD(my_streaming_function)
    ssc.checkpoint(checkpoint)
    return ssc

ssc = StreamingContext.getOrCreate(checkpoint, lambda: create_context())
ssc.start()

Приложение запускается, обработка начинается, но через некоторое время точка (не в состоянии догнать сообщения) Я получаю следующую (ожидаемую) ошибку в определенном пакете:

Caused by: kafka.common.OffsetOutOfRangeException
    at sun.reflect.GeneratedConstructorAccessor33.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.lang.Class.newInstance(Class.java:442)
    at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:188)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:198)
    at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:213)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
    at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

Проблема в том, что приложение никогда не восстанавливается из этого состояния. После «догоняющего» всех пакетов (не ожидающих активных пакетов), когда я публикую sh любые новые сообщения, ошибка по-прежнему сохраняется, даже если сообщение было создано за несколько секунд до его обработки.

I Я бы подумал, что параметр «auto.offset.reset = Large» сбрасывает смещения автоматически, но я думаю, что это применимо только для запуска приложения. Какие варианты у меня есть для автоматического c восстановления?

...