Драйверу не хватает памяти для задания потоковой передачи Spark - PullRequest
0 голосов
/ 26 февраля 2020

Я пытаюсь запустить задание структурированной потоковой передачи на основе файлов с источником S3. Эти файлы в формате JSON. Код читает вновь добавленные файлы в папку S3, анализирует атрибуты JSON и записывает данные обратно в S3 в формате паркета. Я запускаю задание Spark на кластере AWS EMR (версия - 5.29.0)

Streaming Code

def writeToOutput(inputDF, batchId):
    spark.sql("drop table if exists global_temp.source_df")

    inputDF.cache()

    inputDF.createGlobalTempView("source_df")

    df1 = spark.sql(df1_sql)
    df2 = spark.sql(df2_sql)
    df3 = spark.sql(df3_sql)    

    df1.repartition(1) \
    .write \
    .partitionBy("col1", "col2") \
    .format("parquet") \
    .mode('append') \
    .save(output_path + 'df1/')

    df2.repartition(1) \
    .write \
    .partitionBy("col1", "col2") \
    .format("parquet") \
    .mode('append') \
    .save(output_path + 'df2/')

    df3.repartition(1) \
    .write \
    .partitionBy("col1", "col2") \
    .format("parquet") \
    .mode('append') \
    .save(output_path + 'df3/')

    inputDF.unpersist()


inputDF = spark \
          .readStream \
          .schema(jsonSchema) \
          .option("latestFirst", "false") \
          .option("badRecordsPath", bad_records_path) \
          .option("maxFilesPerTrigger", "2000") \
          .json(input_path).withColumn('file_path', input_file_name())


query = inputDF.writeStream \
        .foreachBatch(writeToOutput) \
        .queryName("Stream") \
        .option("checkpointLocation", checkpoint_path) \
        .trigger(processingTime='180 seconds') \
        .start()

query.awaitTermination()

Мои spark-submit конфиги : spark-submit --master yarn --deploy-mode cluster --executor-memory 5G --executor-cores 4 --driver-memory 15G --num-executors 40 --conf spark.dynamicAllocation.enabled=false --conf yarn.resourcemanager.am.max-attempts=4 --conf spark.yarn.am.attemptFailuresValidityInterval=1h --conf spark.executor.memoryOverhead=1024 --conf spark.driver.memoryOverhead=512 --conf spark.yarn.max.executor.failures=300 --conf spark.yarn.executor.failuresValidityInterval=1h --conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.streaming.stopGracefullyOnShutdown=true --conf spark.task.maxFailures=8 --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties

Мои задания выполняются в течение 1-2 часов и завершаются с ошибкой Container is running beyond physical memory limits. Current usage: 15.6 GB of 15.5 GB physical memory used; 19.1 GB of 77.5 GB virtual memory used. Killing container.

Я не могу понять, почему драйверу требуется так много памяти? Может кто-нибудь, пожалуйста, помогите

...