Я пытаюсь найти похожие названия компаний, используя spark и R. В настоящее время мой набор данных содержит более 500 миллионов строк, и в будущем он будет только увеличиваться.Количество уникальных названий компаний составляет 75 млн. +.Я запускаю это на AWS EMR с 15 узлами и главным узлом.Настройки искры следующие:
spark.executor.cores: 3
spark.executor.memory: 30g
spark.driver.memory: 80g
spark.driver.cores: 8
spark.memory.fraction: 0.1
spark.executor.memoryOverhead: 5g
Количество узлов: 15 (64 ядра,ОЗУ 488 ГБ и твердотельный накопитель 200 ГБ каждый) число исполнителей на узел: множитель 13 разделов: 19
Чтобы сделать это, я сначала выясняю SOUNDEX для каждого названия компании, а затем выполняю самостоятельное объединение в SOUNDEX иПервые два алфавита каждой компании.После объединения я хотел бы узнать расстояние Левенштейна между двумя названиями компаний (в каждом ряду) и выбрать наиболее популярное, если расстояние находится в определенных пределах, иначе проигнорируйте этот ряд.
Проблемалежит в самосоединении (насколько я знаю, но я был бы рад принять другие рекомендации или альтернативный подход к проблеме)
Я пытался увеличить множитель разделов до 100 или уменьшить искру.executor.cores to 2.
Я также пытался присоединиться к таблице на SOUNDEX и первых трех алфавитах.
Код:
spark + R
dist.frac=0.2
min.dist.float = 0.7
max.dist.float=4
method='dl'
old.col='company_name'
pref = 'fuzzy'
substr.mn = 1
substr.mx = 2
weight=c(1,1,1,1)
new.col = paste(pref, old.col, sep='_')
spark_df =
spark_df %>%
rename(old_col = old.col)
fuzzy_spark_df =
spark_df %>%
group_by(old_col) %>%
summarize(freq = count()) %>%
mutate(character_count = length(old_col)) %>%
mutate(max_dist = pmin(pmax(min.dist.float, dist.frac*character_count), max.dist.float)) %>%
mutate(grpcol = paste(soundex(old_col), '|', substr(old_col, substr.mn, substr.mx))) %>%
filter(character_count >= 2)
fuzzy_spark_df =
fuzzy_spark_df %>%
sdf_persist(persist.type)
fuzzy_spark_df =
fuzzy_spark_df %>%
sdf_repartition(partition_by = c('grpcol'), partitions = n.partitions)
collect(head(fuzzy_spark_df)) # to force compuations
logNote('performing the cross join')
fuzzy_spark_df =
fuzzy_spark_df %>% select(old_col, grpcol) %>%
inner_join(fuzzy_spark_df, by=c('grpcol'), suffix=c('_orig', '_fuzzy'))
fuzzy_spark_df =
fuzzy_spark_df %>%
mutate(distance = levenshtein(old_col_orig, old_col_fuzzy))
fuzzy_spark_df =
fuzzy_spark_df %>%
filter(max_dist >= distance) %>%
group_by(old_col_orig) %>%
filter(freq == max(freq, na.rm = T) & distance == min(distance, na.rm = T)) %>%
arrange(old_col_fuzzy) %>%
filter(row_number() == 1) %>%
select('old_col_orig', 'old_col_fuzzy') %>%
distinct()
spark_df =
spark_df %>%
sdf_broadcast() %>%
left_join(fuzzy_spark_df, by = c('old_col' = 'old_col_orig')) %>%
mutate(old_coenter code herel_fuzzy = ifelse(is.na(old_col_fuzzy) | is.null(old_col_fuzzy), 'blank', old_col_fuzzy)) %>%
rename(!!old.col := 'old_col', !!new.col := 'old_col_fuzzy')
фрейм данных искры spark_df:
company_name join_col
walgreens W123 | wa
walgreen W123 | wa
walmart W123 | wa
cisco C654 | ci
cicso C654 | ci
carta C986 | ca
Выход (внутреннее соединение spark_df на col join_col)
company_name join_col fuzzy_company_name
walgreens W123 | wa walgreen
walgreens W123 | wa walgreens
walgreens W123 | wa walmart
walgreen W123 | wa walgreen
walgreen W123 | wa walgreens
walgreen W123 | wa walmart
walmart W123 | wa walgreen
walmart W123 | wa walgreens
walmart W123 | wa walmart
cisco C654 | ci cisco
cisco C654 | ci cicso
cicso C654 | ci cicso
cicso C654 | ci cisco
carta C986 | ca carta