PySpark Создать новый столбец из преобразований в другом фрейме данных - PullRequest
0 голосов
/ 05 июня 2018

В поисках более функционального и вычислительно эффективного подхода в PySpark ->

У меня есть основная таблица (содержащая миллиарды строк), столбцы интересов:

id - (String),

токены - (Массив (строка)) - ex, ['alpha', 'beta', 'gamma']

- (Называя это dataframe, df1)

У меня есть еще одна сводная таблица, которая содержит 25 лучших токенов, таких как:

- (Называя его dataframe, df2) Пример:

Токен

Альфа

Beta

Zi

Mu

Теперь к этой второй таблице (или фрейму данных) я хочу добавить строку, содержащую список идентификаторов для этого токена изпервой таблицы, так что результат выглядит следующим образом:

Идентификаторы токенов

Альфа [1, 2, 3]

Бета [3, 5, 6, 8, 9]

Зи [2, 8, 12]

Му [1, 15, 16, 17]

Нынешний подход:

Из df2 определите различные токены и сохраните их в виде списка (скажем, l1).

(For every token from list, l1):
    Filter df1 to extract the unique ids as a list, call it l2
    Add this new list (l2) as a new column (Ids) to the dataframe (df2) to create a new dataframe (df3)
    persist df3 to a table

I agree, это ужасный подход, и для любого данного l1 с 100k записями он будет работать вечно.Может кто-нибудь помочь мне переписать код (для Pyspark)

Ответы [ 2 ]

0 голосов
/ 12 августа 2019

В качестве альтернативы вы можете попытаться объединить обе таблицы в новом столбце, который по существу будет содержать только токены, взорванные в отдельных строках.Это было бы полезно как с точки зрения вычислительной эффективности, выделенных ресурсов и необходимого времени обработки.

Кроме того, есть несколько привилегий соединения "в коробке", включая "соединение на стороне карты", которое будет способствовать продвижению вашего дела.

0 голосов
/ 06 июня 2018

Explode столбец массива tokens из df1, а затем join с df2 (левое соединение) с нижним регистром токенов и токеном , а затем groupBy токеном и сборомid s как установлено

from pyspark.sql import functions as f
#exolode tokens column for joining with df2
df1 = df1.withColumn('tokens', f.explode('tokens'))

#left join with case insensitive and collecting ids as set for each token
df2.join(df1, f.lower(df1.tokens) == f.lower(df2.token), 'left')\
    .groupBy('token')\
    .agg(f.collect_set('id').alias('ids'))\
    .show(truncate=False)

Я надеюсь, что ответ полезен

...