Удаление вложенных циклов и позволить Spark сделать это за вас, должно значительно повысить производительность. Это требует двух шагов, выраженных здесь как функции.
Первый шаг: собрать уникальные значения в каждом столбце в массиве и транспонировать кадр данных.
from pyspark.sql import functions as F
def unique_and_transpose(df):
df = df.select([F.collect_set(col).alias(col) for col in df.columns])
params = []
for col in df.columns:
params.extend([F.lit(col), col])
return df.select(F.explode(F.create_map(*params)).alias('column', 'values'))
Есливсе столбцы гарантированно не содержат повторяющихся значений, F.collect_set(col)
можно заменить на F.collect_array(col)
. Сбор только уникальных значений не является строго необходимым, но он может ускорить второй шаг.
Что эта функция лучше всего иллюстрирует на примере:
>>> df1.show()
+------+-----+----+
| col1| col2|col3|
+------+-----+----+
| red| one| val|
| green| two| 2|
| blue|three| sda|
| black| nine| 452|
|purple| ten| rww|
+------+-----+----+
>>> unique_and_transpose(df1).show(3, False)
+------+---------------------------------+
|column|values |
+------+---------------------------------+
|col3 |[sda, 452, rww, 2, val] |
|col1 |[blue, green, red, black, purple]|
|col2 |[nine, one, three, two, ten] |
+------+---------------------------------+
Второй шаг: создайте декартово произведение из транспонированных наборов данных и выведите искомое количество.
def cross_relate(df1, df2):
return df1.alias('df1').crossJoin(df2.alias('df2')).select(
F.col('df1.column').alias('col_1'),
F.col('df2.column').alias('col_2'),
F.size(F.array_intersect('df1.values', 'df2.values')).alias('nvals')
)
декартово произведение делает то же, что и две вложенные циклы, но оно работает только по строкам, поэтому возникает необходимостьчтобы сначала транспонировать наборы данных.
С помощью этих двух функций вы можете подсчитать количество уникальных общих значений для каждой пары столбцов, например:
df1_ut = unique_and_transpose(df1).cache()
df2_ut = unique_and_transpose(df2).cache()
df = cross_relate(df1_ut, df2_ut)
Результат:
>>> df.show()
+-----+-----+-----+
|col_1|col_2|nvals|
+-----+-----+-----+
| col3|col10| 0|
| col3| col9| 0|
| col3|col11| 3|
| col1|col10| 0|
| col1| col9| 3|
| col1|col11| 1|
| col2|col10| 2|
| col2| col9| 0|
| col2|col11| 0|
+-----+-----+-----+
Вы хотите словарь, так что это еще один шаг:
res = {f"{row.col_1},{row.col_2}": row.nvals for row in df.collect()}
>>> from pprint import pprint
>>> pprint(res)
{'col1,col10': 0,
'col1,col11': 1,
'col1,col9': 3,
'col2,col10': 2,
'col2,col11': 0,
'col2,col9': 0,
'col3,col10': 0,
'col3,col11': 3,
'col3,col9': 0}