У меня большой набор данных (скажем, 4 ГБ), который используется в качестве эталонного источника для обработки другого большого набора данных (100-200 ГБ). У меня есть кластер на 30 исполнителей, чтобы сделать это на 10 узлах.Так что для каждого исполнителя у меня есть собственный jvm, верно?Каждый раз он загружает весь набор эталонных данных.И это занимает много времени и неэффективно.Есть ли хороший подход к этому?В настоящее время я храню данные на s3 aws и запускаю все с emr.Может быть, было бы хорошо использовать более элегантное хранилище, которое я мог бы запросить на лету, или развернуть, например, redis как часть моего кластера и передать данные и затем запросить их?
UPD1:
- Плоские данные - это сжатые CSV-файлы на S3, разделенные на 128 МБ.
- Он считывается в набор данных (объединение для сокращения числа разделов с целью распределения данных по меньшему количеству узлов)
val df = sparkSession.sqlContext.read.format("com.databricks.spark.csv")
.option("header", "false")
.schema(schema)
.option("delimiter", ",")
.load(path)
.coalesce(3)
.as[SegmentConflationRef]
Чем мне нужно преобразовать плоские данные в упорядоченный сгруппированный список и поместить в какое-то хранилище значений ключей, в этом случае карту памяти.
val data: Seq[SegmentConflationRef] = ds.collect()
val map = mutable.Map[String, Seq[SegmentConflationRef]]()
data.groupBy(_.source_segment_id).map(c => {
map += (c._1 -> c._2.sortBy(_.source_start_offset_m))
})
После этого я собираюсь выполнить поиск из другого набора данных.
Так что в этом случае я хочу, чтобы карта ссылок была скопирована у каждого исполнителя.Одна проблема заключается в том, как транслировать такую большую карту через узлы, или какой подход лучше подходить?Вероятно, не использовать Spark с самого начала и загружать данные локально из hdfs в каждом исполнителе?