Я пытаюсь запустить задание структурированной потоковой передачи на основе файлов с источником S3. Эти файлы в формате JSON. Код читает вновь добавленные файлы в папку S3, анализирует атрибуты JSON и записывает данные обратно в S3 в формате паркета. Я запускаю задание Spark на кластере AWS EMR (версия - 5.29.0)
Streaming Code
def writeToOutput(inputDF, batchId):
spark.sql("drop table if exists global_temp.source_df")
inputDF.cache()
inputDF.createGlobalTempView("source_df")
df1 = spark.sql(df1_sql)
df2 = spark.sql(df2_sql)
df3 = spark.sql(df3_sql)
df1.repartition(1) \
.write \
.partitionBy("col1", "col2") \
.format("parquet") \
.mode('append') \
.save(output_path + 'df1/')
df2.repartition(1) \
.write \
.partitionBy("col1", "col2") \
.format("parquet") \
.mode('append') \
.save(output_path + 'df2/')
df3.repartition(1) \
.write \
.partitionBy("col1", "col2") \
.format("parquet") \
.mode('append') \
.save(output_path + 'df3/')
inputDF.unpersist()
inputDF = spark \
.readStream \
.schema(jsonSchema) \
.option("latestFirst", "false") \
.option("badRecordsPath", bad_records_path) \
.option("maxFilesPerTrigger", "2000") \
.json(input_path).withColumn('file_path', input_file_name())
query = inputDF.writeStream \
.foreachBatch(writeToOutput) \
.queryName("Stream") \
.option("checkpointLocation", checkpoint_path) \
.trigger(processingTime='180 seconds') \
.start()
query.awaitTermination()
Мои spark-submit
конфиги : spark-submit --master yarn --deploy-mode cluster --executor-memory 5G --executor-cores 4 --driver-memory 15G --num-executors 40 --conf spark.dynamicAllocation.enabled=false --conf yarn.resourcemanager.am.max-attempts=4 --conf spark.yarn.am.attemptFailuresValidityInterval=1h --conf spark.executor.memoryOverhead=1024 --conf spark.driver.memoryOverhead=512 --conf spark.yarn.max.executor.failures=300 --conf spark.yarn.executor.failuresValidityInterval=1h --conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.streaming.stopGracefullyOnShutdown=true --conf spark.task.maxFailures=8 --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties
Мои задания выполняются в течение 1-2 часов и завершаются с ошибкой Container is running beyond physical memory limits. Current usage: 15.6 GB of 15.5 GB physical memory used; 19.1 GB of 77.5 GB virtual memory used. Killing container.
Я не могу понять, почему драйверу требуется так много памяти? Может кто-нибудь, пожалуйста, помогите