Программа Spark SQL работает медленно для любой конфигурации памяти - PullRequest
0 голосов
/ 13 января 2019

Я запускаю приложение Spark SQL, написанное на JAVA, на кластере EMR. Я пробовал разные варианты памяти, но все еще не смог сократить время выполнения.

JAR развернут в кластере, и я попробовал следующие варианты в кластере EMR с 1 узлом MASTER, 3 узлами CORE и 3 узлами TASK [Все они имеют тип экземпляра большого размера m4.4x]

Я использую spark-submit для запуска JAR из командной строки.

spark-submit --class <ClassPath of Main>  --master yarn   --executor-memory 19G   --num-executors 17   MyCode.jar
spark-submit --class <ClassPath of Main>  --master yarn   --executor-memory 50G   --num-executors 5   MyCode.jar
spark-submit --class <ClassPath of Main>  --master yarn   --executor-memory 40G   --num-executors 5   MyCode.jar
spark-submit --class <ClassPath of Main>  --master yarn   --executor-memory 2G   --num-executors 20   MyCode.jar
spark-submit --class <ClassPath of Main>  --master yarn   MyCode.jar 

Все вышеперечисленные казни заняли более 3 часов, лучшие из них были одно без каких-либо аргументов памяти и первое с 17 исполнителями и 19G памяти. Но все еще очень медленно.

Входные данные представляют собой список файлов паркета, который содержит около 10 миллионов строк и 18 столбцов в каждой строке.

Я также пытался изменить количество паркетных файлов в качестве входных данных. то есть; 10 миллионов строк распределены по 25 файлам 10 миллионов строк распределены по 5 файлам 10 миллионов строк распределены по 3 файлам

больший размер файла также снижает производительность приложения.

Средний размер входного файла - 1 Г паркетных файлов.

Ниже приведены фрагменты кода Java:

SparkSession spark = SparkSession.builder().appname("test").getOrCreate();

StructType schema = getSchemaForTable(input);

DataSet<Row> inFiles=spark.read().option("header",false).schema(schema).parquet(InputFolderInS3);

inFiles.createOrReplcaeTempView("TABLE1");

DataSet<Row> oneDataSet = spark.sql("select col1, col2 from TABLE1 where key IN (val1,val2,val3)");
oneDataSet.write().mode("overwrite").parquet(s3TargetForOne);

DataSet<Row> twoDataSet = spark.sql("select col1, col2 from TABLE1 where key NOT IN (select key from TABLE1 where <condition>)");
twoDataSet.write().mode("overwrite").parquet(s3TargetForTwo);

DataSet<Row> threeDataSet = spark.sql("select col5, col6 from TABLE1 where key IN (val1,val2,val3)");
threeDataSet.write().mode("overwrite").parquet(s3TargetForThree);

DataSet<Row> fourDataSet = spark.sql("select col5, col6 from TABLE1 where key NOT IN (select key from TABLE1 where <condition>)");
fourDataSet.write().mode("overwrite").parquet(s3TargetForFour);

Требуется предложение по оптимизации кода / конфигурации для искры для повышения производительности.

1 Ответ

0 голосов
/ 14 января 2019

Проблема была с самим SQL. У меня были внутренние запросы в предложении where, которые замедлили процесс. Я узнал об этом, ссылаясь на вкладку SparkUI-> SQL. Здесь я увидел, что для моего внешнего запроса был запущен broadCastLoop для каждой строки. Я исправил SQL, и время для 1G данных сократилось до 56 секунд.

...