Как самостоятельно объединить таблицу с более чем 100k + строк на ключ? - PullRequest
0 голосов
/ 04 июля 2019

Я пытаюсь найти похожие названия компаний, используя spark и R. В настоящее время мой набор данных содержит более 500 миллионов строк, и в будущем он будет только увеличиваться.Количество уникальных названий компаний составляет 75 млн. +.Я запускаю это на AWS EMR с 15 узлами и главным узлом.Настройки искры следующие:

spark.executor.cores: 3

spark.executor.memory: 30g

spark.driver.memory: 80g

spark.driver.cores: 8

spark.memory.fraction: 0.1

spark.executor.memoryOverhead: 5g

Количество узлов: 15 (64 ядра,ОЗУ 488 ГБ и твердотельный накопитель 200 ГБ каждый) число исполнителей на узел: множитель 13 разделов: 19

Чтобы сделать это, я сначала выясняю SOUNDEX для каждого названия компании, а затем выполняю самостоятельное объединение в SOUNDEX иПервые два алфавита каждой компании.После объединения я хотел бы узнать расстояние Левенштейна между двумя названиями компаний (в каждом ряду) и выбрать наиболее популярное, если расстояние находится в определенных пределах, иначе проигнорируйте этот ряд.

Проблемалежит в самосоединении (насколько я знаю, но я был бы рад принять другие рекомендации или альтернативный подход к проблеме)

Я пытался увеличить множитель разделов до 100 или уменьшить искру.executor.cores to 2.

Я также пытался присоединиться к таблице на SOUNDEX и первых трех алфавитах.

Код:

spark + R



    dist.frac=0.2
    min.dist.float = 0.7
    max.dist.float=4
    method='dl'
    old.col='company_name'
    pref = 'fuzzy'
    substr.mn = 1
    substr.mx = 2
    weight=c(1,1,1,1)

    new.col = paste(pref, old.col, sep='_')

    spark_df =
        spark_df %>%
        rename(old_col = old.col)

    fuzzy_spark_df =
        spark_df %>%
        group_by(old_col) %>%
        summarize(freq = count()) %>%
        mutate(character_count = length(old_col)) %>%
        mutate(max_dist = pmin(pmax(min.dist.float, dist.frac*character_count), max.dist.float)) %>%
        mutate(grpcol = paste(soundex(old_col), '|', substr(old_col, substr.mn, substr.mx))) %>%
        filter(character_count >= 2)

    fuzzy_spark_df =
        fuzzy_spark_df %>%
        sdf_persist(persist.type)

    fuzzy_spark_df =
        fuzzy_spark_df %>%
        sdf_repartition(partition_by = c('grpcol'), partitions = n.partitions)

    collect(head(fuzzy_spark_df)) # to force compuations

    logNote('performing the cross join')

    fuzzy_spark_df =
        fuzzy_spark_df %>% select(old_col, grpcol) %>%
        inner_join(fuzzy_spark_df, by=c('grpcol'), suffix=c('_orig', '_fuzzy'))

    fuzzy_spark_df =
        fuzzy_spark_df %>%
        mutate(distance = levenshtein(old_col_orig, old_col_fuzzy))

    fuzzy_spark_df =
        fuzzy_spark_df %>%
        filter(max_dist >= distance) %>%
        group_by(old_col_orig) %>%
        filter(freq == max(freq, na.rm = T) & distance == min(distance, na.rm = T)) %>%
        arrange(old_col_fuzzy) %>%
        filter(row_number() == 1) %>%
        select('old_col_orig', 'old_col_fuzzy') %>%
        distinct()

    spark_df =
        spark_df %>%
        sdf_broadcast() %>%
        left_join(fuzzy_spark_df, by = c('old_col' = 'old_col_orig')) %>%
        mutate(old_coenter code herel_fuzzy = ifelse(is.na(old_col_fuzzy) | is.null(old_col_fuzzy), 'blank', old_col_fuzzy)) %>%
        rename(!!old.col := 'old_col', !!new.col := 'old_col_fuzzy')

фрейм данных искры spark_df:

  company_name join_col
  walgreens     W123 | wa
  walgreen      W123 | wa
  walmart       W123 | wa
  cisco         C654 | ci
  cicso         C654 | ci
  carta         C986 | ca

Выход (внутреннее соединение spark_df на col join_col)

   company_name    join_col    fuzzy_company_name
     walgreens     W123 | wa   walgreen
     walgreens     W123 | wa   walgreens
     walgreens     W123 | wa   walmart
     walgreen      W123 | wa   walgreen
     walgreen      W123 | wa   walgreens
     walgreen      W123 | wa   walmart
     walmart       W123 | wa   walgreen
     walmart       W123 | wa   walgreens
     walmart       W123 | wa   walmart
     cisco         C654 | ci   cisco
     cisco         C654 | ci   cicso
     cicso         C654 | ci   cicso
     cicso         C654 | ci   cisco
     carta         C986 | ca   carta
...