Повышение производительности объединения и группирования с помощью Spark DataFrames - PullRequest
0 голосов
/ 21 декабря 2018

INPUT:

У меня есть два набора данных:

  1. samples_1 набор данных со следующими столбцами: timestamp, id, x, y и 500M записей.
  2. samples_2 набор данных, имеющий те же столбцы, что и sample_1, и записи 50M.

ПРИМЕЧАНИЯ:

  • В одном наборе данных timestamp и id образуют уникальный ключ каждой записи, т. Е. Могут быть продублированы только timestamp и id.
  • Для всех наборов данных id изодин набор данных не может быть реплицирован на другой.Тем не менее, timestamp может дублироваться в двух наборах данных.
  • Мой кластер содержит узел драйвера и пять подчиненных узлов, каждый из которых имеет 16 ядер и 64 ГБ ОЗУ.
  • Я назначаю 15исполнители для моей работы, у каждого по 5 ядер и 19 ГБ оперативной памяти.

ВОПРОС:

Я пытаюсь сделать следующее: для каждого (timestamp_1, id_1) кортеж в sample_1, мне нужно найти все (timestamp_2, id_2, x_2, y_2) s из sample_2, где timestamp_1 равно timestamp_2.

ЧТО Я ПРОБОВАЛ:

samples_2
  .withColumn("combined", struct("id", "x", "y"))
  .groupBy("timestamp")
  .agg(collect_list("combined").as("combined_list"))
  .join(samples_2, Seq("timestamp"), "rightouter")
  .map {
    case Row(timestamp: String, samples: mutable.WrappedArray[GenericRowWithSchema], id_1: String, x_1: Float, y_1: Float) =>
      val overlappingSamples = samples.map {case Row(id_2: String, x_2: Float, y_2: Float) => (id_2, x_2, y_2)}

      if(overlappingSamples.nonEmpty) {
        val stringifiedSamples = overlappingSamples.map(x => s"${x._1}:${x._2}:${x._3}")
        (timestamp, id_1, stringifiedSamples.mkString("&"))
      } else {
        (timestamp, id_1,"", "")
      }

    case Row(timestamp: String, _, id_1: String, x_1: Float, y_1: Float) => // no overlapping samples
      (timestamp, id_1, "", "")

  }
  .write
  .csv(outputPath)

Я попробовал этот код (используя меньшие наборы данных), и он дал результаты, которые я ищу.Проблема в том, что он становится очень медленным, когда я запускаю его для больших наборов данных.Я прочитал, что мне нужно настроить количество разделов через --conf spark.sql.shuffle.partitions=5000, но это не решило проблему.

1 Ответ

0 голосов
/ 21 декабря 2018

Проблема, которую я вижу в приведенном выше запросе, заключается в том, что слишком много операций тасования связаны друг с другом.Я не проверял фактическую логику объединения, но в искре есть общая проблема, которую необходимо обработать.

По моему мнению, когда выполнение DAG становится длинным в SPARK, оно становится немного хрупким.Причина в том, что любой сбой на первом этапе требует пересчета всей группы доступности базы данных.

Стратегия, которую я выбрал, состоит в том, чтобы разбить группу доступности базы данных в нескольких небольших группах доступности базы данных, сохранив результат каждого соединения.

val result = datasetA.join(datasetB).persist()
result.count // forces the materialization
// use the result variable in other join

Здесь подсчет является обязательным, поскольку, как и при других операциях, «spark-persist» является ленивым и требует явного действия (count) для принудительного объединения и материализации результата.

Вы можете попробоватьто же самое для вашей работы и проверить производительность.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...