Отказоустойчивые для Kafka Direct Stream не работают. Каталог контрольных точек не существует - PullRequest
1 голос
/ 10 марта 2020

Пишу приложение для чтения данных из кафки топи c. И я не могу добиться отказоустойчивости в случае отказа водителя. Приложение работает в кластере k8s с использованием spark submit. Когда я запускаю свое приложение в первый раз, все идет хорошо, но когда я удаляю pod из кластера, перезапуск приложения приводит к ошибке. Checkpoint directory does not exist: file:/opt/spark/.../rdd-541. Я использую отказоустойчивое хранилище. Ниже приведен фрагмент кода и ошибка более подробно. Спасибо за помощь. Дайте мне знать, если немного деталей.

def functionToCreateContext():
    sc = SparkContext("spark-master", "kafka_spark")
    sc.setLogLevel('ERROR')
    ssc = StreamingContext(sc, 20)
    kafkaParams = {'bootstrap.servers': 'kafka.cluster'}
    kafkaStream = KafkaUtils.createDirectStream(ssc, ['topic'], kafkaParams)
    statistic_window = kafkaStream.transform(parse_reduce).reduceByKeyAndWindow(lambda x, y: x + y, 
                     lambda x, y: x - y,60, 20)
    top = statistic_window.transform(found)
    top.pprint()
    ssc.checkpoint(cpd)
    return ssc

if __name__ == "__main__":
    ssc = StreamingContext.getOrCreate(cpd, functionToCreateContext)
    ssc.start()
    ssc.awaitTermination()

traceback:

20/03/10 14:05:19 INFO SparkContext: Created broadcast 1 from checkpointFile at DStreamCheckpointData.scala:114
Traceback (most recent call last):
  File "/app-spark/kafka_spark.py", line 75, in <module>
    ssc = StreamingContext.getOrCreate(cpd, functionToCreateContext)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 105, in getOrCreate
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o2.tryRecoverFromCheckpoint.
: java.lang.IllegalArgumentException: requirement failed: Checkpoint directory does not exist: file:/opt/spark/.../rdd-541
        at scala.Predef$.require(Predef.scala:224)

1 Ответ

1 голос
/ 10 марта 2020

Я предполагаю, что вы получаете эту ошибку, потому что ваш каталог контрольных точек не является постоянным томом. Поэтому при удалении модуля указанный каталог также удаляется, и вы получаете эту ошибку Checkpoint directory does not exist.

Решением будет использование Постоянный том для каталога контрольных точек.

Здесь вы также можете найти пример в точно такой же топике c .

...