Пересечение 2-х фреймов с количеством в PySpark DataBricks - PullRequest
0 голосов
/ 30 августа 2018

Я хочу, чтобы значение пересечения 2 данных (столбцов) совпадало с unique_ID, и сохраняло значение пересечения в new_column-1, а также получало количество данных пересечения в new_column_3. Фрейм данных я дал ниже. Этот код я выполняю в PySpark (DataBricks). Я не знаю, как написать код для пересечения на pyspark. Ваш быстрый ответ / поддержка будут высоко оценены.

         Pos_id  Emp_id skill_list_p skill_list_e
0 0       1     100          [a]    [a, f, d]
  3       1     101          [a]    [a, b, e]
  6       1     102          [a]    [b, d, c]
1 0       2     100       [d, b]    [a, f, d]
  3       2     101       [d, b]    [a, b, e]
  6       2     102       [d, b]    [b, d, c]
3 0       3     100    [c, d, a]    [a, f, d]
  3       3     101    [c, d, a]    [a, b, e]
  6       3     102    [c, d, a]    [b, d, c]
6 0       4     100       [a, b]    [a, f, d]
  3       4     101       [a, b]    [a, b, e]
  6       4     102       [a, b]    [b, d, c]

Ожидаемый вывод прилагается:

    Pos_id  Emp_id  skill_list_p    skill_list_e    Matched Matched_skills_list Matched_Skills
0   0   1   100 ['a']   ['a' 'f' 'd']   1   {'a'}   a
0   3   1   101 ['a']   ['a' 'b' 'e']   1   {'a'}   a
0   6   1   102 ['a']   ['b' 'd' 'c']   0   set()   
1   0   2   100 ['d' 'b']   ['a' 'f' 'd']   1   {'d'}   d
1   3   2   101 ['d' 'b']   ['a' 'b' 'e']   1   {'b'}   b
1   6   2   102 ['d' 'b']   ['b' 'd' 'c']   2   {'d', 'b'}  d,b
3   0   3   100 ['c' 'd' 'a']   ['a' 'f' 'd']   2   {'a', 'd'}  a,d
3   3   3   101 ['c' 'd' 'a']   ['a' 'b' 'e']   1   {'a'}   a
3   6   3   102 ['c' 'd' 'a']   ['b' 'd' 'c']   2   {'c', 'd'}  c,d
6   0   4   100 ['a' 'b']   ['a' 'f' 'd']   1   {'a'}   a
6   3   4   101 ['a' 'b']   ['a' 'b' 'e']   2   {'a', 'b'}  a,b
6   6   4   102 ['a' 'b']   ['b' 'd' 'c']   1   {'b'}   b

Ожидаемый результат

Ответы [ 2 ]

0 голосов
/ 07 сентября 2018

Самый простой способ - использовать udf в pyspark.sql.functions
Вот пример.

from pyspark.sql import functions as F
from pyspark.sql import types as T

# Declare an udf which uses set.interection() in python to find intersection between arrays.
array_intersect = F.udf(lambda r1, r2: list(set(r1).intersection(set(r2))),
                        T.ArrayType(T.StringType()))

# Use the udf we declared before to generate a new column which is the intersection between 
# skill_list_p and skill_list_e
df = df.withColumn('matched_skill_list',
                   array_intersect(F.col('skill_list_p'), F.col('skill_list_e')))

# Calculate the size of the intersection.
df = df.withColumn('matched', F.size(F.col('matched_skill_list')))

# Show the result
print(df.show())
0 голосов
/ 30 августа 2018

Это может помочь подумать об этом с точки зрения того, как можно сделать это в SQL. Датафреймы предназначены для представления в виде таблицы. Описанная цель состоит в создании нового столбца, который является результатом преобразования, примененного к двум существующим столбцам.

В SQL это будет выглядеть как

select "emp_id", transformation("skill_list_p", "skill_list_e") as "common_skills" from ...

При таком подходе я рекомендую вам взглянуть на Пользовательские функции (UDF) , предоставляемые в Apache Spark ™.

...