Как прервать длинную линию rdd, чтобы избежать переполнения стека - PullRequest
0 голосов
/ 27 марта 2019

Я пытаюсь объединить большое количество маленьких файлов avro (в формате hdf) в файл паркета. Похоже, если в этом каталоге есть тонна avro-файлов, я получаю ОШИБКУ yarn.ApplicationMaster: Исключение класса пользователя: java.lang.StackOverflowError

Ошибка:

19/03/26 15:14:14 INFO avro.AvroRelation: Listing hdfs://rc-hddd701.dev.local:8020/ur/source/Avro/urlog-avro-dm.dmlog/2019/03/21/20190321235700-dm-appd703-9abf19b8-2f6f-4341-87d7-74c0175e980d.avro on driver
19/03/26 15:14:14 INFO avro.AvroRelation: Listing hdfs://rc-hddd701.dev.local:8020/ur/source/Avro/urlog-avro-dm.dmlog/2019/03/21/20190321235800-DM-APPTSTD701-6af176ba-68f8-4420-b1b0-2f2be6abf003.avro on driver
19/03/26 15:14:14 INFO avro.AvroRelation: Listing hdfs://rc-hddd701.dev.local:8020/ur/source/Avro/urlog-avro-dm.dmlog/2019/03/21/20190321235800-dm-appd701-70b0ff1c-1664-4ce7-8321-149e12961627.avro on driver
19/03/26 15:14:14 INFO avro.AvroRelation: Listing hdfs://rc-hddd701.dev.local:8020/ur/source/Avro/urlog-avro-dm.dmlog/2019/03/21/20190321235800-dm-appd702-3dcbe094-14c9-4a4f-b326-57256df78b50.avro on driver
19/03/26 15:14:14 INFO avro.AvroRelation: Listing hdfs://rc-hddd701.dev.local:8020/ur/source/Avro/urlog-avro-dm.dmlog/2019/03/21/20190321235800-dm-appd703-a8a3ef8b-4dc0-41c1-a69a-2ef432fee0af.avro on driver
19/03/26 15:14:56 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.StackOverflowError
java.lang.StackOverflowError
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

код, который я использую

val df_array = filePaths.map(path => sqlContext.read.format("com.databricks.spark.avro").load(path.toString))
      val df_mid = df_array.reduce((df1, df2) => df1.unionAll(df2))
      val df = df_mid
        .withColumn("dt", date_format(df_mid.col("timeStamp"), "yyyy-MM-dd"))
        .filter("dt != 'null'")
      df
        .repartition(df.col("dt"))  //repartition vs coalese: https://stackoverflow.com/questions/31610971/spark-repartition-vs-coalesce
        .write.partitionBy("dt")
        .mode(SaveMode.Append)
        .option("compression","snappy")
        .parquet(avroConsolidator.parquetFilePathSpark.toString)

Где filePaths - Массив [Путь].

Этот код работает, если я пытаюсь обработать меньшее количество путей.

После того, как я немного погуглил, я обнаружил, что указание контрольной точки на фрейме данных может быть решением проблемы, но я не уверен, как этого добиться.

Версия Spark: 2.1

...