Как улучшить производительность моих spark-sql-joins - PullRequest
0 голосов
/ 03 января 2019

У меня есть два источника данных (оба файла csv), один из них является источником входящих данных (2,2 миллиона записей) и источником основных данных (35 миллионов записей).Моя работа состоит в том, чтобы проверить, сколько записей во входящем источнике данных соответствует основному источнику данных, и вывести их.Ключевым моментом здесь является то, что записи являются шумными и требуют нечеткого сопоставления строк вместо точного сопоставления.Мое объединение хорошо работает с небольшими данными, но когда мне приходится делать то же самое для больших данных, оно занимает вечность.

К вашему сведению .. С этим кодом мне потребовалось около 1 часа 40 минут, чтобы выполнить соединениевходящие данные (1 млн. записей) по сравнению с основными данными (3 млн. записей) на 8-ядерном компьютере.

Например.Источник основных данных имеет одну из 35 миллионов записей, как показано ниже

"Markets, Inc.", 1 Bank Plz, Chicago, IL, 60670-0001, IL

Входящие данные имеют одну иззаписи

"Markets Inc", 1 Bank Pl, Chicago, IL, 60670-0001, IL

ниже мой код

def myFunc: (String => String) = {
      s =>
        if (s.length > 5) {
          s.substring(0, 5)
        } else s
    }
val myUDF = udf(myFunc)
var sourcedata = spark.sqlContext.read.option("header", "true").option("delimiter", "|")
  .csv("./src/main/resources/company_address_sample3000000.txt").na.fill("")
  .select(col("COMPANY_NAME").alias("NAME1"), concat(col("STREET_ADDR_1"),
    col("STREET_ADDR_2")).alias("ADDRESS1"), col("CITY").alias("CITY1"), col("STATE").alias("STATE1"),
    myUDF(col("ZIP")).alias("ZIP1"))
  .withColumn("Beginswith1", col("NAME1").substr(0, 1)).distinct()
  .repartition(col("Beginswith1"), col("NAME1"), col("ADDRESS1"), col("CITY1"), col("STATE1"), col("ZIP1"))
var incomingData = spark.sqlContext.read.option("header", "true").option("delimiter", "|")
  .csv("./src/main/resources/common_format_sample1000000.txt")
  .select("NAME", "ADDRESS", "CITY", "STATE", "ZIP")
  .withColumn("Beginswith", col("NAME").substr(0, 1)).distinct()
  .repartition(col("Beginswith"), col("NAME"), col("ADDRESS"), col("CITY"), col("STATE"), col("ZIP"))

def calculate_similarity(str: String, str1: String): Double = {
  val dist = new JaroWinkler()
  Try {
    dist.similarity(str, str1)
  } getOrElse (0.0)
}

def myFilterFunction(
                      nameInp: String, nameRef: String,
                      addInp: String, addRef: String,
                      cityInp: String, cityRef: String,
                      stateInp: String, stateRef: String,
                      zipInp: String, zipRef: String
                    ) = {
  stateInp == stateRef && cityInp == cityRef && calculate_similarity(nameInp, nameRef) > 0.8 && calculate_similarity(addInp, addRef) > 0.8
}

val udf1 = org.apache.spark.sql.functions.udf(myFilterFunction _)
val filter: Column = udf1(
  incomingData("NAME"), sourcedata("NAME1"),
  incomingData("ADDRESS"), sourcedata("ADDRESS1"),
  incomingData("CITY"), sourcedata("CITY1"),
  incomingData("STATE"), sourcedata("STATE1"),
  incomingData("ZIP"), sourcedata("ZIP1")
)

incomingData.join(sourcedata, incomingData("Beginswith") === sourcedata("Beginswith1") && filter, "left_semi")
  .write.csv("./src/main/resources/hihello3-0.8-1m3m.csv")

1 Ответ

0 голосов
/ 10 января 2019

Перестановка последовательности фильтра соединения значительно сократила время с 1 часа 50 минут до 90 секунд.Хотя это не решение с точки зрения оптимизации SQL, оно послужило моей текущей цели, учитывая мои данные.Я все еще хотел бы посмотреть, если кто-то придумает решение, с точки зрения оптимизации sql.

var sourcedata = spark.sqlContext.read.option("header", "true").option("delimiter", "|")
      .csv("./src/main/resources/company_address.txt").na.fill("")
      .select(col("COMPANY_NAME").alias("NAME1"), concat(col("STREET_ADDR_1"),
        col("STREET_ADDR_2")).alias("ADDRESS1"), col("CITY").alias("CITY1"), col("STATE").alias("STATE1"),
        col("ZIP").alias("ZIP1"))
      .withColumn("Beginswith1", col("NAME1").substr(0, 1))
      .repartition(col("Beginswith1"), col("NAME1"), col("ADDRESS1"), col("CITY1"), col("STATE1"), col("ZIP1"))

var incomingData_Select = spark.sqlContext.read.option("header", "true").option("delimiter", "|")
  .csv("./src/main/resources/common_format.txt")
  .select("NAME", "ADDRESS", "CITY", "STATE", "ZIP")
  .withColumn("Beginswith", col("NAME").substr(0, 1)).distinct()
  .repartition(col("Beginswith"), col("NAME"), col("ADDRESS"), col("CITY"), col("STATE"), col("ZIP"))

def calculate_similarity(str: String, str1: String, str2: String, str3: String): Boolean = {
  val dist = new JaroWinkler()
  Try {
    dist.similarity(str, str1) > 0.8 && dist.similarity(str2, str3) > 0.8
  } getOrElse (false)
}

def myFilterFunction(
                      nameInp: String, nameRef: String,
                      addInp: String, addRef: String
                    ) = {
  calculate_similarity(nameInp, nameRef, addInp, addRef)
}

val sim_udf = org.apache.spark.sql.functions.udf(myFilterFunction _)

val filter: Column = sim_udf(
  incomingData_Select("NAME"), sourcedata("NAME1"),
  incomingData_Select("ADDRESS"), sourcedata("ADDRESS1")
)

val matching_companies = incomingData_Select
  .join(sourcedata, incomingData_Select("STATE") === sourcedata("STATE1") && incomingData_Select("CITY") === sourcedata("CITY1") && incomingData_Select("Beginswith") === sourcedata("Beginswith1") && filter, "left_semi")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...