PySpark - применить функцию для переразделенных пакетов - PullRequest
1 голос
/ 09 марта 2020

Я делаю нечеткое совпадение строк , используя MinHashLSH и approxSimilarityJoin на 500 миллиардах пар. Он слишком велик для моей текущей настройки кластера, поэтому я хочу запустить его партиями

Я хочу разделить данные и запустить approxSimilarityJoin для каждого раздела итеративно, чтобы мой кластер мог его обработать.

Моя текущая функция:

matched_df = model.stages[-1].approxSimilarityJoin(df1, df2, 1.0, "confidence")

Но я застрял на том, как объединить repartition, foreachPartition и approxSimilarityJoin.

Я думаю, что это должно быть что-то вроде:

df1.repartition(100).foreachPartition(batch : model.stages[-1].approxSimilarityJoin(batch, df2, 1.0, "confidence"))

но у меня неверный синтаксис. Какой правильный синтаксис для foreachPartition?

1 Ответ

1 голос
/ 10 марта 2020

Я не думаю, что вы можете достичь этого, используя foreachParition. foreachParition принимает функцию, которая будет выполняться на исполнителях, и передает в нее фактические данные, а не фрейм данных (это действие, которое будет запускать обработку, например .collect или .write, а не просто определение преобразования). И если вы захотите воссоздать фрейм данных из этого переданного в наборе, это также не сработает, поскольку на самом работнике нет контекста искры. Концептуально dataframe - это не таблица, а ленивое определение преобразования.

Однако вы можете просто разделить df1 с помощью Spark. Если нет ключа, по которому вы можете фильтровать DataFrame, вы можете просто сделать это, используя randomSplit, например:

df.randomSplit((0.1, 0.1, 0.1, 0.1, 0.1), seed=42)

Результатом этой операции является список DataFrames

[DataFrame[date: string, text: string],
 DataFrame[date: string, text: string],
 DataFrame[date: string, text: string],
 DataFrame[date: string, text: string],
 DataFrame[date: string, text: string]]

, по которому вы можете выполнять итерации, используя обычные Python

dfs = df.randomSplit((0.1, 0.1, 0.1, 0.1, 0.1), seed=42)
for df in dfs:
    matched_df = model.stages[-1].approxSimilarityJoin(df, df2, 1.0, "confidence")
    do_something_with(matched_df)

Чтобы разделить ваш набор данных таким образом на 100 частей, вы можете сгенерировать кортеж весов:

df.randomSplit(tuple([0.01 for x in range (100)]), seed=42)
...