Как я могу эффективно распараллелить выполнение SparkSQL? - PullRequest
0 голосов
/ 27 апреля 2018

Окружающая среда

  • Скала
  • Apache Spark: Spark 2.2.1
  • EMR на AWS: emr-5.12.1

Содержание

У меня есть один большой DataFrame, как показано ниже:

val df = spark.read.option("basePath", "s3://some_bucket/").json("s3://some_bucket/group_id=*/")

JSON-файлы размером ~ 1 ТБ в s3://some_bucket включают 5000 разделов group_id. Я хочу выполнить преобразование с использованием SparkSQL, и оно отличается от каждого group_id.

Код Spark выглядит так:

// Create view
val df = spark.read.option("basePath", "s3://data_lake/").json("s3://data_lake/group_id=*/")
df.createOrReplaceTempView("lakeView")

// one of queries like this:
// SELECT 
//   col1 as userId,
//   col2 as userName,
//   .....
// FROM
//   lakeView
// WHERE
//   group_id = xxx;
val queries: Seq[String] = getGroupIdMapping

// ** Want to know better ways **
queries.par.foreach(query => {
  val convertedDF: DataFrame = spark.sql(query)
  convertedDF.write.save("s3://another_bucket/")
})

par может распараллелить на Runtime.getRuntime.availableProcessors num, и оно будет равно числу ядер драйверов.

Но это кажется странным и недостаточно эффективным, потому что это не имеет никакого отношения к парализации Спарка.

Я действительно хочу сделать что-то вроде groupBy в scala.collection.Seq.

Это неправильный искровой код:

df.groupBy(groupId).foreach((groupId, parDF) => {
  parDF.createOrReplaceTempView("lakeView")
  val convertedDF: DataFrame = spark.sql(queryByGroupId)
  convertedDF.write.save("s3://another_bucket")
})

1 Ответ

0 голосов
/ 27 апреля 2018

1) Прежде всего, если ваши данные уже сохранены в файлах по идентификатору группы, нет причин смешивать их, а затем группировать по идентификатору с помощью Spark. Гораздо проще и эффективнее загружать для каждого идентификатора группы только релевантные файлы

2) Spark сама распараллеливает вычисления. Поэтому в большинстве случаев нет необходимости во внешнем распараллеливании. Но если вы чувствуете, что Spark не использует все ресурсы, вы можете:

a) если каждое отдельное вычисление занимает менее нескольких секунд, тогда затраты на планирование задачи сопоставимы со временем выполнения задачи, поэтому можно получить ускорение, выполнив несколько задач параллельно.

b) вычисления занимают значительное количество времени, но ресурсы все еще используются недостаточно. Тогда, скорее всего, вам следует увеличить количество разделов для вашего набора данных.

3) Если вы наконец решили запустить несколько задач параллельно, это можно сделать следующим образом:

val parallelism = 10
val executor = Executors.newFixedThreadPool(parallelism)
val ec: ExecutionContext = ExecutionContext.fromExecutor(executor)
val tasks: Seq[String] = ???
val results: Seq[Future[Int]] = tasks.map(query => {
  Future{
    //spark stuff here
    0
  }(ec)
})
val allDone: Future[Seq[Int]] = Future.sequence(results)
//wait for results
Await.result(allDone, scala.concurrent.duration.Duration.Inf)
executor.shutdown //otherwise jvm will probably not exit 
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...