Пишу приложение для чтения данных из кафки топи c. И я не могу добиться отказоустойчивости в случае отказа водителя. Приложение работает в кластере k8s с использованием spark submit. Когда я запускаю свое приложение в первый раз, все идет хорошо, но когда я удаляю pod из кластера, перезапуск приложения приводит к ошибке. Checkpoint directory does not exist: file:/opt/spark/.../rdd-541
. Я использую отказоустойчивое хранилище. Ниже приведен фрагмент кода и ошибка более подробно. Спасибо за помощь. Дайте мне знать, если немного деталей.
def functionToCreateContext():
sc = SparkContext("spark-master", "kafka_spark")
sc.setLogLevel('ERROR')
ssc = StreamingContext(sc, 20)
kafkaParams = {'bootstrap.servers': 'kafka.cluster'}
kafkaStream = KafkaUtils.createDirectStream(ssc, ['topic'], kafkaParams)
statistic_window = kafkaStream.transform(parse_reduce).reduceByKeyAndWindow(lambda x, y: x + y,
lambda x, y: x - y,60, 20)
top = statistic_window.transform(found)
top.pprint()
ssc.checkpoint(cpd)
return ssc
if __name__ == "__main__":
ssc = StreamingContext.getOrCreate(cpd, functionToCreateContext)
ssc.start()
ssc.awaitTermination()
traceback:
20/03/10 14:05:19 INFO SparkContext: Created broadcast 1 from checkpointFile at DStreamCheckpointData.scala:114
Traceback (most recent call last):
File "/app-spark/kafka_spark.py", line 75, in <module>
ssc = StreamingContext.getOrCreate(cpd, functionToCreateContext)
File "/opt/spark/python/lib/pyspark.zip/pyspark/streaming/context.py", line 105, in getOrCreate
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o2.tryRecoverFromCheckpoint.
: java.lang.IllegalArgumentException: requirement failed: Checkpoint directory does not exist: file:/opt/spark/.../rdd-541
at scala.Predef$.require(Predef.scala:224)