Как оптимизировать поиск содержимого RDD в другом RDD - PullRequest
2 голосов
/ 23 июня 2019

У меня проблема с поиском содержимого СДР в другом СДР.

Этот вопрос отличается от Эффективного сопоставления строк в Apache Spark , так как я ищу точное совпадение и мне не нужны накладные расходы на использование стека ML.

Я новичок в зажигании и хочу знать, какой из этих методов более эффективен или есть другой способ. У меня есть файл с ключевыми словами, как в приведенном ниже примере (в производстве он может достигать 200 строк)

Пример файла ключевых слов

0.47uF 25V X7R 10% -TDK C2012X7R1E474K125AA
20pF-50V NPO/COG - AVX- 08055A200JAT2A

и у меня есть еще один файл (отдельно от табуляции), в котором мне нужно найти совпадения (в производстве у меня до 80 миллионов строк)

C2012X7R1E474K125AA Conn M12 Circular PIN 5 POS Screw ST Cable Mount 5 Terminal 1 Port

Первый метод Я определил UDF и перебрал ключевые слова для каждой строки

keywords = sc.textFile("keys")
part_description = sc.textFile("part_description")


def build_regex(keywords):
    res = '('
    for key in keywords:
        res += '(?<!\\\s)%s(?!\\\s)|' % re.escape(key)
    res = res[0:len(res) - 1] + ')'
    return r'%s' % res


def get_matching_string(line, regex):
    matches = re.findall(regex, line, re.IGNORECASE)
    matches = list(set(matches))
    return list(set(matches)) if matches else None


def find_matching_regex(line):
    result = list()
    for keyGroup in keys:
        matches = get_matching_string(line, keyGroup)
        if matches:
            result.append(str(keyGroup) + '~~' + str(matches) + '~~' + str(len(matches)))
    if len(result) > 0:
        return result


def split_row(list):
    try:
        return Row(list[0], list[1])
    except:
        return None


keys_rdd = keywords.map(lambda keywords: build_regex(keywords.replace(',', ' ').replace('-', ' ').split(' ')))
keys = keys_rdd.collect()

sc.broadcast(keys)

part_description = part_description.map(lambda item: item.split('\t'))
df = part_description.map(lambda list: split_row(list)).filter(lambda x: x).toDF(
    ["part_number", "description"])

find_regex = udf(lambda line: find_matching_regex(line), ArrayType(StringType()))

df = df.withColumn('matched', find_regex(df['part_number']))

df = df.filter(df.matched.isNotNull())

df.write.save(path=job_id, format='csv', mode='append', sep='\t')

Второй метод Я думал, что смогу сделать больше параллельной обработки (вместо циклического перебора ключей, как описано выше). Я сделал декартово произведение между ключами и строками, разбил и разбил ключи, затем сравнил каждый ключ со столбцом детали

df = part_description.cartesian(keywords)

    df = df.map(lambda tuple: (tuple[0].split('\t'), tuple[1])).map(
        lambda tuple: (tuple[0][0], tuple[0][1], tuple[1]))

    df = df.toDF(['part_number', 'description', 'keywords'])

    df = df.withColumn('single_keyword', explode(split(F.col("keywords"), "\s+"))).where('keywords != ""')

    df = df.withColumn('matched_part_number', (df['part_number'] == df['single_keyword']))

    df = df.filter(df['matched_part_number'] == F.lit(True))

    df.write.save(path='part_number_search', format='csv', mode='append', sep='\t')

Это правильные способы сделать это? Что я могу сделать, чтобы обработать эти данные быстрее?

1 Ответ

1 голос
/ 24 июня 2019

Это действительные решения, и я использовал оба в разных обстоятельствах.

Вы передаете меньше данных, используя свой широковещательный подход, отправляя только 200 дополнительных строк каждому исполнителю, а не 200 раз реплицируете каждую строку вашего> 80-метрового файла строк, так что, скорее всего, эта будет быстрее для вас.

Я использовал декартовой подход, когда число записей в моем поиске практически невозможно транслировать (будучи намного, намного больше, чем 200 строк).

В вашей ситуации я бы использовал трансляцию.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...