Я новичок в Spark, мой пример использования - обработать файл 100 Гб в spark и загрузить его в куст.У меня есть 2 узла 128 ГБ оперативной памяти в каждом кластере.Под обработкой я подразумеваю добавить дополнительный столбец к существующему csv, значение которого рассчитывается во время выполнения.Но каждый раз, когда я запускаю spark-submit, он не выдает следующую ошибку: -
Exception in thread "task-result-getter-1" java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.apache.spark.unsafe.types.UTF8String.read(UTF8String.java:1205)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.read(DefaultSerializers.java:363)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.read(DefaultSerializers.java:355)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311)
at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1819)
at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Команда, которую я пытаюсь использовать, приведена ниже: -
spark-submit --master yarn-client \
--executor-memory 8G --total-executor-cores 2 \
--class "com.test.app.Test" \
spark.old-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
harshaltestdata harshaltestdata \
--jars spark-csv_2.10-1.5.0.jar
Примечание:
harshaltestdata
- это мое имя CSV в формате HDFS harshaltestdata
- это имя моей таблицы.
Я пробовал код для файла размером до 50 МБ ион работает нормально, но когда я пробую его с более чем 100 МБ, это не удается.
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.functions.lit
object Test {
def main(args: Array[String]) {
//table csv name as in
val csvName = args(0)
val tableName = args(1)
System.setProperty("SPARK_YARN_MODE", "true");
val sparkConfiguration = new SparkConf();
sparkConfiguration.setMaster("yarn-client");
sparkConfiguration.setAppName("test-spark-job");
sparkConfiguration
.set("spark.executor.memory", "12g")
.set("spark.kryoserializer.buffer.max", "512")
val sparkContext = new SparkContext(sparkConfiguration);
println("started spark job")
val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
val hiveContext = new HiveContext(sparkContext)
val data = hiveContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("hdfs_path***" + csvName + ".csv");
//Printing in lines
data.collect().foreach(println)
//Printing in tabular form
data.show()
val newdf = data.withColumn("date", lit("10-04-19"))
newdf.withColumn("date", lit("10-04-19"))
newdf.write.mode("append").saveAsTable(tableName)
val d = hiveContext.sql("select * from " + tableName)
d.show()
}
}
Ожидаемые результаты - файл должен быть обработан и загружен в Hive