Я пытаюсь ускорить и ограничить стоимость взятия нескольких столбцов и их значений и вставки их в карту в одной строке.Это требование, потому что у нас есть устаревшая система, которая читает с этой работы, и она еще не готова к рефакторингу.Существует также еще одна карта с некоторыми данными, которые необходимо объединить с этим.
В настоящее время у нас есть несколько решений, каждое из которых, по-видимому, приводит к примерно одинаковому времени выполнения на одном и том же кластере с около 1 ТБ данных, хранящихся в Parquet:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import spark.implicits._
def jsonToMap(s: String, map: Map[String, String]): Map[String, String] = {
implicit val formats = org.json4s.DefaultFormats
val jsonMap = if(!s.isEmpty){
parse(s).extract[Map[String, String]]
} else {
Map[String, String]()
}
if(map != null){
map ++ jsonMap
} else {
jsonMap
}
}
val udfJsonToMap = udf(jsonToMap _)
def addMap(key:String, value:String, map: Map[String,String]): Map[String,String] = {
if(map == null) {
Map(key -> value)
} else {
map + (key -> value)
}
}
val addMapUdf = udf(addMap _)
val output = raw.columns.foldLeft(raw.withColumn("allMap", typedLit(Map.empty[String, String]))) { (memoDF, colName) =>
if(colName.startsWith("columnPrefix/")){
memoDF.withColumn("allMap", when(col(colName).isNotNull, addMapUdf(substring_index(lit(colName), "/", -1), col(colName), col("allTagsMap")) ))
} else if(colName.equals("originalMap")){
memoDF.withColumn("allMap", when(col(colName).isNotNull, udfJsonToMap(col(colName), col("allMap"))))
} else {
memoDF
}
}
занимает около 1 часа на9 m5.xlarge
val resourceTagColumnNames = raw.columns.filter(colName => colName.startsWith("columnPrefix/"))
def structToMap: Row => Map[String,String] = { row =>
row.getValuesMap[String](resourceTagColumnNames)
}
val structToMapUdf = udf(structToMap)
val experiment = raw
.withColumn("allStruct", struct(resourceTagColumnNames.head, resourceTagColumnNames.tail:_*))
.select("allStruct")
.withColumn("allMap", structToMapUdf(col("allStruct")))
.select("allMap")
Также выполняется примерно за 1 час на том же кластере
Этот код работает, но он недостаточно быстр, он примерно в 10 раз длиннее любого другого преобразования, которое мы выполняеместь прямо сейчас, и это бутылочное горлышко для нас.
Есть ли другой способ получить более эффективный результат?
Редактировать: Я также пытался ограничить данные ключом, потому что значения в столбцах, которые я объединяю, могут измениться, несмотря на то, что ключ остается прежним, я не могу ограничить размер данных, не рискуя потерять данные.