Я запускаю приложение Spark SQL, написанное на JAVA, на кластере EMR. Я пробовал разные варианты памяти, но все еще не смог сократить время выполнения.
JAR развернут в кластере, и я попробовал следующие варианты в кластере EMR с 1 узлом MASTER, 3 узлами CORE и 3 узлами TASK [Все они имеют тип экземпляра большого размера m4.4x]
Я использую spark-submit для запуска JAR из командной строки.
spark-submit --class <ClassPath of Main> --master yarn --executor-memory 19G --num-executors 17 MyCode.jar
spark-submit --class <ClassPath of Main> --master yarn --executor-memory 50G --num-executors 5 MyCode.jar
spark-submit --class <ClassPath of Main> --master yarn --executor-memory 40G --num-executors 5 MyCode.jar
spark-submit --class <ClassPath of Main> --master yarn --executor-memory 2G --num-executors 20 MyCode.jar
spark-submit --class <ClassPath of Main> --master yarn MyCode.jar
Все вышеперечисленные казни заняли более 3 часов, лучшие из них были одно без каких-либо аргументов памяти и первое с 17 исполнителями и 19G памяти. Но все еще очень медленно.
Входные данные представляют собой список файлов паркета, который содержит около 10 миллионов строк и 18 столбцов в каждой строке.
Я также пытался изменить количество паркетных файлов в качестве входных данных.
то есть;
10 миллионов строк распределены по 25 файлам
10 миллионов строк распределены по 5 файлам
10 миллионов строк распределены по 3 файлам
больший размер файла также снижает производительность приложения.
Средний размер входного файла - 1 Г паркетных файлов.
Ниже приведены фрагменты кода Java:
SparkSession spark = SparkSession.builder().appname("test").getOrCreate();
StructType schema = getSchemaForTable(input);
DataSet<Row> inFiles=spark.read().option("header",false).schema(schema).parquet(InputFolderInS3);
inFiles.createOrReplcaeTempView("TABLE1");
DataSet<Row> oneDataSet = spark.sql("select col1, col2 from TABLE1 where key IN (val1,val2,val3)");
oneDataSet.write().mode("overwrite").parquet(s3TargetForOne);
DataSet<Row> twoDataSet = spark.sql("select col1, col2 from TABLE1 where key NOT IN (select key from TABLE1 where <condition>)");
twoDataSet.write().mode("overwrite").parquet(s3TargetForTwo);
DataSet<Row> threeDataSet = spark.sql("select col5, col6 from TABLE1 where key IN (val1,val2,val3)");
threeDataSet.write().mode("overwrite").parquet(s3TargetForThree);
DataSet<Row> fourDataSet = spark.sql("select col5, col6 from TABLE1 where key NOT IN (select key from TABLE1 where <condition>)");
fourDataSet.write().mode("overwrite").parquet(s3TargetForFour);
Требуется предложение по оптимизации кода / конфигурации для искры для повышения производительности.