PySpark Объединить данные и считать значения - PullRequest
1 голос
/ 18 октября 2019

У меня есть два разных кадра данных, и я хочу узнать количество пересечений между m столбцами из df1 и n столбцами из df2. Под пересечением я подразумеваю количество уникальных значений, общих для обоих столбцов. Если в df1 есть 10 столбцов, а в df2 - 20 столбцов, то количество пересечений, которое я получу, равно 200. Я здесь использую только PySpark.

В моем случае данные огромны, и я выполнил следующие коды

dict = {}
for a in df1.columns:
    i_u = df1.select(a).distinct()
    i_u = i_u.select(a).collect()
    for b in df2.columns:
            i_b = df2.select(b).distinct()
            i_b = i_b.select(b).collect()
            l = len(list(set(i_u) & set(i_b)))
            str = a + ","+b+","
            dict[str] = l

ИЛИ этот код

dict = {}
for a in df1.columns:
    if not "." in a:
        for b in df2.columns:
            l = df1.join(df2, df1[a] == df2[b], how="inner")
            l = l.select(a).distinct().count()
            str = a + ","+b+","
            dict[str] = l

Или этот

dict = {}
for a in df1.columns:
    i_u = df1.select(a).distinct()
    for b in df2.columns:
            a_u = df2.select(b).distinct()
            l = i_u.join(a_u, i_u[a] == a_u[b], how="inner").count()
            str = a + ","+b+","
            dict[str] = l

Но во всех этих случаях код недостаточно быстр, так как я запускаю два for петли. Я хочу создать этот словарь или любое представление данных, где у меня есть имена двух столбцов и их количество пересечений. Я пытался использовать cache, но все еще недостаточно хорошо.

Набор данных:

df1 = pd.DataFrame({'col1':['red', 'green', 'blue','black','purple'], 'col2': ['one','two','three','nine','ten'], 'col3': ['val','2','sda','452','rww']})

df2 = pd.DataFrame({'col9':['red', 'green', 'pink','orange','purple'], 'col10': ['seven','ten','nine','six','seven'], 'col11': ['val','2','dsrf','452','red']})

df1 = sqlContext.createDataFrame(df1)
df2 = sqlContext.createDataFrame(df2)

выходной словарь должен выглядеть примерно так или в любом другом формате, если есть имена столбцов иих количествоэто может быть в любом формате, я просто забочусь о выводе.

dict = {"col1, col9": 3, "col1, col10": 0, "col1, col11": 1, ... ...}

1 Ответ

2 голосов
/ 19 октября 2019

Удаление вложенных циклов и позволить Spark сделать это за вас, должно значительно повысить производительность. Это требует двух шагов, выраженных здесь как функции.

Первый шаг: собрать уникальные значения в каждом столбце в массиве и транспонировать кадр данных.

from pyspark.sql import functions as F

def unique_and_transpose(df):
    df = df.select([F.collect_set(col).alias(col) for col in df.columns])
    params = []
    for col in df.columns:
        params.extend([F.lit(col), col])
    return df.select(F.explode(F.create_map(*params)).alias('column', 'values'))

Есливсе столбцы гарантированно не содержат повторяющихся значений, F.collect_set(col) можно заменить на F.collect_array(col). Сбор только уникальных значений не является строго необходимым, но он может ускорить второй шаг.

Что эта функция лучше всего иллюстрирует на примере:

>>> df1.show()
+------+-----+----+
|  col1| col2|col3|
+------+-----+----+
|   red|  one| val|
| green|  two|   2|
|  blue|three| sda|
| black| nine| 452|
|purple|  ten| rww|
+------+-----+----+

>>> unique_and_transpose(df1).show(3, False)
+------+---------------------------------+
|column|values                           |
+------+---------------------------------+
|col3  |[sda, 452, rww, 2, val]          |
|col1  |[blue, green, red, black, purple]|
|col2  |[nine, one, three, two, ten]     |
+------+---------------------------------+

Второй шаг: создайте декартово произведение из транспонированных наборов данных и выведите искомое количество.

def cross_relate(df1, df2):
    return df1.alias('df1').crossJoin(df2.alias('df2')).select(
        F.col('df1.column').alias('col_1'),
        F.col('df2.column').alias('col_2'),
        F.size(F.array_intersect('df1.values', 'df2.values')).alias('nvals')
    )

декартово произведение делает то же, что и две вложенные циклы, но оно работает только по строкам, поэтому возникает необходимостьчтобы сначала транспонировать наборы данных.

С помощью этих двух функций вы можете подсчитать количество уникальных общих значений для каждой пары столбцов, например:

df1_ut = unique_and_transpose(df1).cache()
df2_ut = unique_and_transpose(df2).cache()
df = cross_relate(df1_ut, df2_ut)

Результат:

>>> df.show()
+-----+-----+-----+
|col_1|col_2|nvals|
+-----+-----+-----+
| col3|col10|    0|
| col3| col9|    0|
| col3|col11|    3|
| col1|col10|    0|
| col1| col9|    3|
| col1|col11|    1|
| col2|col10|    2|
| col2| col9|    0|
| col2|col11|    0|
+-----+-----+-----+

Вы хотите словарь, так что это еще один шаг:

res = {f"{row.col_1},{row.col_2}": row.nvals for row in df.collect()}

>>> from pprint import pprint
>>> pprint(res)
{'col1,col10': 0,
 'col1,col11': 1,
 'col1,col9': 3,
 'col2,col10': 2,
 'col2,col11': 0,
 'col2,col9': 0,
 'col3,col10': 0,
 'col3,col11': 3,
 'col3,col9': 0}
...