Выполнение запросов SparkSQL медленнее, чем моя база данных - PullRequest
0 голосов
/ 29 марта 2019

Приветствие

Я создал кластер Spark 2.1.1 в Amazon EC2 с экземпляром типа m4.large, состоящим из 1 главного и 5 подчиненных для запуска. Моя база данных PostgreSQL 9.5 (t2.large) содержит таблицу из более чем 2 миллиардов строк и 7 столбцов, которые я хотел бы обработать. Я следовал указаниям веб-сайта Apache Spark и других источников о том, как подключать и обрабатывать эти данные.

Моя проблема в том, что производительность Spark SQL намного ниже, чем в моей базе данных. Мой SQL-оператор (см. Ниже в коде) занимает около 21 минуты в PSQL, но Spark SQL занимает около 42 минут, чтобы закончить. Моя главная цель - измерить производительность PSQL против Spark SQL, и пока я не получаю желаемых результатов. Буду признателен за помощь.

Спасибо

Я пытался увеличить fetchSize с 10000 до 100000, кэшировать фрейм данных, увеличить числовое разделение до 100, установить spark.sql.shuffle до 2000, удвоить размер моего кластера и использовать больший тип экземпляра, и пока я не видел никаких улучшений .

val spark = SparkSession.builder()
                        .appName("Spark SQL")
                        .getOrCreate();
val jdbcDF = spark.read.format("jdbc")
                  .option("url", DBI_URL)
                  .option("driver", "org.postgresql.Driver")
                  .option("dbtable", "ghcn_all")
                  .option("fetchsize", 10000)
                  .load()
                  .createOrReplaceTempView("ghcn_all");

val sqlStatement = "SELECT ghcn_date, element_value/10.0 
FROM ghcn_all 
WHERE station_id = 'USW00094846' 
      AND (ghcn_date >= '2015-01-01' AND ghcn_date <= '2015-12-31') 
      AND qflag IS NULL 
      AND element_type = 'PRCP' 
ORDER BY ghcn_date";

val sqlDF = spark.sql(sqlStatement);

var start:Long = System.nanoTime;
val num_rows:Long = sqlDF.count();
var end:Long = System.nanoTime;
println("Total Row                : " + num_rows);
println("Total Collect Time Lapse : " + ((end - start) / 1000000) + " ms");

Ответы [ 2 ]

2 голосов
/ 29 марта 2019

Нет веских причин для того, чтобы этот код работал на Spark быстрее, чем одна база данных. Прежде всего, он даже не распространяется, так как вы допустили ту же ошибку, что и многие до вас, и не разбивает данные на части .

Но более важно то, что вы фактически загружаете данные из базы данных - в результате он должен выполнить как минимум столько же работы (и на практике больше), затем отправить данные по сети, а затем данные должны быть проанализированы Spark. и обработано. Вы в основном делаете больше работы и ожидаете, что все будет быстрее - этого не произойдет.

Если вы хотите надежно улучшить производительность на Spark, вы должны как минимум:

  • Извлечение данных из базы данных.
  • Запись в эффективное (например, не S3) распределенное хранилище.
  • Используйте правильные сегментирование и разбиение для включения сокращения разделов и предиката pushdown.

Тогда вам может быть не хватать. Но опять же, правильная индексация ваших данных в кластере также должна повысить производительность, вероятно, при более низких общих затратах.

1 голос
/ 29 марта 2019

Очень важно установить partitionColumn при чтении из SQL.Это используют для параллельного запроса.Поэтому вы должны решить, какой столбец является вашим partitionColumn.

В вашем случае, например:

val jdbcDF = spark.read.format("jdbc")
                  .option("url", DBI_URL)
                  .option("driver", "org.postgresql.Driver")
                  .option("dbtable", "ghcn_all")
                  .option("fetchsize", 10000)
                  .option("partitionColumn", "ghcn_date")
                  .option("lowerBound", "2015-01-01")
                  .option("upperBound", "2015-12-31")
                  .option("numPartitions",16 )
                  .load()
                  .createOrReplaceTempView("ghcn_all");

Дополнительная информация:

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...