Объединение строк в один столбец структуры в Spark Scala имеет проблемы с эффективностью, как мы можем сделать это лучше? - PullRequest
0 голосов
/ 23 октября 2018

Я пытаюсь ускорить и ограничить стоимость взятия нескольких столбцов и их значений и вставки их в карту в одной строке.Это требование, потому что у нас есть устаревшая система, которая читает с этой работы, и она еще не готова к рефакторингу.Существует также еще одна карта с некоторыми данными, которые необходимо объединить с этим.

В настоящее время у нас есть несколько решений, каждое из которых, по-видимому, приводит к примерно одинаковому времени выполнения на одном и том же кластере с около 1 ТБ данных, хранящихся в Parquet:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import spark.implicits._

def jsonToMap(s: String, map: Map[String, String]): Map[String, String] = { 
  implicit val formats = org.json4s.DefaultFormats
    val jsonMap = if(!s.isEmpty){
      parse(s).extract[Map[String, String]]
    } else {
      Map[String, String]()
    }
    if(map != null){
      map ++ jsonMap
    } else {
      jsonMap
    }
  }
val udfJsonToMap = udf(jsonToMap _)

def addMap(key:String, value:String, map: Map[String,String]): Map[String,String] = {
  if(map == null) {
    Map(key -> value)
  } else {
    map + (key -> value)
  }
}

val addMapUdf = udf(addMap _)

val output = raw.columns.foldLeft(raw.withColumn("allMap", typedLit(Map.empty[String, String]))) { (memoDF, colName) =>
    if(colName.startsWith("columnPrefix/")){
        memoDF.withColumn("allMap", when(col(colName).isNotNull, addMapUdf(substring_index(lit(colName), "/", -1), col(colName), col("allTagsMap")) ))
    } else if(colName.equals("originalMap")){
        memoDF.withColumn("allMap", when(col(colName).isNotNull, udfJsonToMap(col(colName), col("allMap"))))
    } else {
      memoDF
    }
}

занимает около 1 часа на9 m5.xlarge

val resourceTagColumnNames = raw.columns.filter(colName => colName.startsWith("columnPrefix/"))
def structToMap: Row => Map[String,String] = { row =>
  row.getValuesMap[String](resourceTagColumnNames)
}
val structToMapUdf = udf(structToMap)

val experiment = raw
  .withColumn("allStruct", struct(resourceTagColumnNames.head, resourceTagColumnNames.tail:_*))
  .select("allStruct")
  .withColumn("allMap", structToMapUdf(col("allStruct")))
  .select("allMap")

Также выполняется примерно за 1 час на том же кластере

Этот код работает, но он недостаточно быстр, он примерно в 10 раз длиннее любого другого преобразования, которое мы выполняеместь прямо сейчас, и это бутылочное горлышко для нас.

Есть ли другой способ получить более эффективный результат?

Редактировать: Я также пытался ограничить данные ключом, потому что значения в столбцах, которые я объединяю, могут измениться, несмотря на то, что ключ остается прежним, я не могу ограничить размер данных, не рискуя потерять данные.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...