Какие параметры я должен установить для обработки 100 ГБ CSV в Spark 1.6 - PullRequest
0 голосов
/ 11 февраля 2019

Я новичок в Spark, мой пример использования - обработать файл 100 Гб в spark и загрузить его в куст.У меня есть 2 узла 128 ГБ оперативной памяти в каждом кластере.Под обработкой я подразумеваю добавить дополнительный столбец к существующему csv, значение которого рассчитывается во время выполнения.Но каждый раз, когда я запускаю spark-submit, он не выдает следующую ошибку: -

Exception in thread "task-result-getter-1" java.lang.OutOfMemoryError: GC overhead limit exceeded
        at org.apache.spark.unsafe.types.UTF8String.read(UTF8String.java:1205)
        at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.read(DefaultSerializers.java:363)
        at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.read(DefaultSerializers.java:355)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
        at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
        at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311)
        at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1819)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Команда, которую я пытаюсь использовать, приведена ниже: -

spark-submit --master yarn-client \
             --executor-memory 8G --total-executor-cores 2 \
             --class "com.test.app.Test" \
             spark.old-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
             harshaltestdata   harshaltestdata  \
             --jars spark-csv_2.10-1.5.0.jar

Примечание:

  • harshaltestdata - это мое имя CSV в формате HDFS
  • harshaltestdata - это имя моей таблицы.

Я пробовал код для файла размером до 50 МБ ион работает нормально, но когда я пробую его с более чем 100 МБ, это не удается.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.functions.lit

object Test {
  def main(args: Array[String]) {
    //table csv name as in
    val csvName = args(0)
    val tableName = args(1)
    System.setProperty("SPARK_YARN_MODE", "true");
    val sparkConfiguration = new SparkConf();
    sparkConfiguration.setMaster("yarn-client");
    sparkConfiguration.setAppName("test-spark-job");
    sparkConfiguration
      .set("spark.executor.memory", "12g")
      .set("spark.kryoserializer.buffer.max", "512")

    val sparkContext = new SparkContext(sparkConfiguration);
    println("started spark job")
    val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
    val hiveContext = new HiveContext(sparkContext)

    val data = hiveContext.read
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .option("mode", "DROPMALFORMED")
      .load("hdfs_path***" + csvName + ".csv");
    //Printing in lines
    data.collect().foreach(println)
    //Printing in tabular form
    data.show()
    val newdf = data.withColumn("date", lit("10-04-19"))
    newdf.withColumn("date", lit("10-04-19"))
    newdf.write.mode("append").saveAsTable(tableName)

    val d = hiveContext.sql("select * from " + tableName)
    d.show()
    }
    }

Ожидаемые результаты - файл должен быть обработан и загружен в Hive

1 Ответ

0 голосов
/ 11 февраля 2019

Никогда не используйте collect(), если он вам действительно не нужен, это вызовет проблемы с памятью, особенно если у вас большой файл CSV.

А вторая строка избыточна, вы можете удалить ее.

val newdf = data.withColumn("date", lit("10-04-19"))
newdf.withColumn("date", lit("10-04-19")) // It means nothing, you can remove it.
newdf.write.mode("append").saveAsTable(tableName)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...