У меня есть два источника данных (оба файла csv), один из них является источником входящих данных (2,2 миллиона записей) и источником основных данных (35 миллионов записей).Моя работа состоит в том, чтобы проверить, сколько записей во входящем источнике данных соответствует основному источнику данных, и вывести их.Ключевым моментом здесь является то, что записи являются шумными и требуют нечеткого сопоставления строк вместо точного сопоставления.Мое объединение хорошо работает с небольшими данными, но когда мне приходится делать то же самое для больших данных, оно занимает вечность.
К вашему сведению .. С этим кодом мне потребовалось около 1 часа 40 минут, чтобы выполнить соединениевходящие данные (1 млн. записей) по сравнению с основными данными (3 млн. записей) на 8-ядерном компьютере.
Например.Источник основных данных имеет одну из 35 миллионов записей, как показано ниже
"Markets, Inc.", 1 Bank Plz, Chicago, IL, 60670-0001, IL
Входящие данные имеют одну иззаписи
"Markets Inc", 1 Bank Pl, Chicago, IL, 60670-0001, IL
ниже мой код
def myFunc: (String => String) = {
s =>
if (s.length > 5) {
s.substring(0, 5)
} else s
}
val myUDF = udf(myFunc)
var sourcedata = spark.sqlContext.read.option("header", "true").option("delimiter", "|")
.csv("./src/main/resources/company_address_sample3000000.txt").na.fill("")
.select(col("COMPANY_NAME").alias("NAME1"), concat(col("STREET_ADDR_1"),
col("STREET_ADDR_2")).alias("ADDRESS1"), col("CITY").alias("CITY1"), col("STATE").alias("STATE1"),
myUDF(col("ZIP")).alias("ZIP1"))
.withColumn("Beginswith1", col("NAME1").substr(0, 1)).distinct()
.repartition(col("Beginswith1"), col("NAME1"), col("ADDRESS1"), col("CITY1"), col("STATE1"), col("ZIP1"))
var incomingData = spark.sqlContext.read.option("header", "true").option("delimiter", "|")
.csv("./src/main/resources/common_format_sample1000000.txt")
.select("NAME", "ADDRESS", "CITY", "STATE", "ZIP")
.withColumn("Beginswith", col("NAME").substr(0, 1)).distinct()
.repartition(col("Beginswith"), col("NAME"), col("ADDRESS"), col("CITY"), col("STATE"), col("ZIP"))
def calculate_similarity(str: String, str1: String): Double = {
val dist = new JaroWinkler()
Try {
dist.similarity(str, str1)
} getOrElse (0.0)
}
def myFilterFunction(
nameInp: String, nameRef: String,
addInp: String, addRef: String,
cityInp: String, cityRef: String,
stateInp: String, stateRef: String,
zipInp: String, zipRef: String
) = {
stateInp == stateRef && cityInp == cityRef && calculate_similarity(nameInp, nameRef) > 0.8 && calculate_similarity(addInp, addRef) > 0.8
}
val udf1 = org.apache.spark.sql.functions.udf(myFilterFunction _)
val filter: Column = udf1(
incomingData("NAME"), sourcedata("NAME1"),
incomingData("ADDRESS"), sourcedata("ADDRESS1"),
incomingData("CITY"), sourcedata("CITY1"),
incomingData("STATE"), sourcedata("STATE1"),
incomingData("ZIP"), sourcedata("ZIP1")
)
incomingData.join(sourcedata, incomingData("Beginswith") === sourcedata("Beginswith1") && filter, "left_semi")
.write.csv("./src/main/resources/hihello3-0.8-1m3m.csv")