INPUT:
У меня есть два набора данных:
samples_1
набор данных со следующими столбцами: timestamp, id, x, y
и 500M записей. samples_2
набор данных, имеющий те же столбцы, что и sample_1
, и записи 50M.
ПРИМЕЧАНИЯ:
- В одном наборе данных
timestamp
и id
образуют уникальный ключ каждой записи, т. Е. Могут быть продублированы только timestamp
и id
. - Для всех наборов данных
id
изодин набор данных не может быть реплицирован на другой.Тем не менее, timestamp
может дублироваться в двух наборах данных. - Мой кластер содержит узел драйвера и пять подчиненных узлов, каждый из которых имеет 16 ядер и 64 ГБ ОЗУ.
- Я назначаю 15исполнители для моей работы, у каждого по 5 ядер и 19 ГБ оперативной памяти.
ВОПРОС:
Я пытаюсь сделать следующее: для каждого (timestamp_1, id_1)
кортеж в sample_1
, мне нужно найти все (timestamp_2, id_2, x_2, y_2)
s из sample_2
, где timestamp_1
равно timestamp_2
.
ЧТО Я ПРОБОВАЛ:
samples_2
.withColumn("combined", struct("id", "x", "y"))
.groupBy("timestamp")
.agg(collect_list("combined").as("combined_list"))
.join(samples_2, Seq("timestamp"), "rightouter")
.map {
case Row(timestamp: String, samples: mutable.WrappedArray[GenericRowWithSchema], id_1: String, x_1: Float, y_1: Float) =>
val overlappingSamples = samples.map {case Row(id_2: String, x_2: Float, y_2: Float) => (id_2, x_2, y_2)}
if(overlappingSamples.nonEmpty) {
val stringifiedSamples = overlappingSamples.map(x => s"${x._1}:${x._2}:${x._3}")
(timestamp, id_1, stringifiedSamples.mkString("&"))
} else {
(timestamp, id_1,"", "")
}
case Row(timestamp: String, _, id_1: String, x_1: Float, y_1: Float) => // no overlapping samples
(timestamp, id_1, "", "")
}
.write
.csv(outputPath)
Я попробовал этот код (используя меньшие наборы данных), и он дал результаты, которые я ищу.Проблема в том, что он становится очень медленным, когда я запускаю его для больших наборов данных.Я прочитал, что мне нужно настроить количество разделов через --conf spark.sql.shuffle.partitions=5000
, но это не решило проблему.