Преобразовать значение строки фрейма данных pyspark как отношение других элементов в той же строке - PullRequest
0 голосов
/ 05 мая 2018

Я пытаюсь преобразовать значение строки искрового фрейма как отношение каждого другого значения той же строки. Я планирую сделать это, поддерживая список всех элементов строк и сопоставляя его с отдельными значениями строк. Лучше проиллюстрировано приведенным ниже примером

Входной кадр данных

>>> df = spark.createDataFrame([('1111','1010', 'aaaa'), ('2222','2020', 'bbbb'), ('3333','3030', 'cccc')], ['company_id', 'client_id', 'partner_id'])
>>> df.show()
+----------+---------+----------+
|company_id|client_id|partner_id|
+----------+---------+----------+
|      1111|     1010|      aaaa|
|      2222|     2020|      bbbb|
|      3333|     3030|      cccc|
+----------+---------+----------+

Ожидаемый результат

+------+------------------+
|entity|         relations|
+------+------------------+
|  1111|[1111, 1010, aaaa]|
|  2222|[2222, 2020, bbbb]|
|  3333|[3333, 3030, cccc]|
|  1010|[1111, 1010, aaaa]|
|  2020|[2222, 2020, bbbb]|
|  3030|[3333, 3030, cccc]|
|  aaaa|[1111, 1010, aaaa]|
|  bbbb|[2222, 2020, bbbb]|
|  cccc|[3333, 3030, cccc]|
+------+------------------+

Я реализовал приведенный ниже код и смог достичь ожидаемого результата. Но данные в этом фактическом фрейме данных, как ожидается, будут очень большими, поэтому просто хочу проверить, есть ли лучший подход для решения этой проблемы.

Моя реализация

from functools import reduce
from pyspark.sql import DataFrame
import pyspark.sql.functions as F

def unionAll(*dfs):
   return reduce(DataFrame.unionAll, dfs)

df = spark.createDataFrame([('1111','1010', 'aaaa'), ('2222','2020', 'bbbb'), ('3333','3030', 'cccc')], ['company_id', 'client_id', 'partner_id'])
company_df = df.select(df.company_id.alias('entity'), F.array(df.company_id, df.client_id, df.partner_id).alias('relations'))
client_df = df.select(df.client_id.alias('entity'), F.array(df.company_id, df.client_id, df.partner_id).alias('relations'))
partner_df = df.select(df.partner_id.alias('entity'), F.array(df.company_id, df.client_id, df.partner_id).alias('relations'))
entity_df = unionAll(company_df, client_df, partner_df)
entity_df.show()
+------+------------------+
|entity|         relations|
+------+------------------+
|  1111|[1111, 1010, aaaa]|
|  2222|[2222, 2020, bbbb]|
|  3333|[3333, 3030, cccc]|
|  1010|[1111, 1010, aaaa]|
|  2020|[2222, 2020, bbbb]|
|  3030|[3333, 3030, cccc]|
|  aaaa|[1111, 1010, aaaa]|
|  bbbb|[2222, 2020, bbbb]|
|  cccc|[3333, 3030, cccc]|
+------+------------------+

Ответы [ 2 ]

0 голосов
/ 06 мая 2018

Я улучшил свою предыдущую реализацию, как показано ниже, если кто-то посчитает ее полезной.

from functools import reduce
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
import pyspark.sql.types as T

to_list = F.udf(lambda *x: filter(None, x), T.ArrayType(T.StringType()))
df = spark.createDataFrame([('1111','1010', 'aaaa'), ('2222','2020', 'bbbb'), ('3333','3030', 'cccc')], ['company_id', 'client_id', 'partner_id'])
df = df.withColumn('relations', to_list(df['company_id'], df['client_id'], df['partner_id']))
transformed_df = df.select(F.explode(to_list(df.company_id, df.client_id, df.partner_id)).alias('entity'), df.relations)

transformed_df.show()

+------+------------------+
|entity|          relation|
+------+------------------+
|  1111|[1111, 1010, aaaa]|
|  1010|[1111, 1010, aaaa]|
|  aaaa|[1111, 1010, aaaa]|
|  2222|[2222, 2020, bbbb]|
|  2020|[2222, 2020, bbbb]|
|  bbbb|[2222, 2020, bbbb]|
|  3333|[3333, 3030, cccc]|
|  3030|[3333, 3030, cccc]|
|  cccc|[3333, 3030, cccc]|
+------+------------------+
0 голосов
/ 06 мая 2018

Пожалуйста, попробуйте это. Вы просто должны сделать еще один столбец с этими списками, вот и все. Тогда вы даже можете отбросить все, что захотите. Это вычислительно дешевле, чем ваш код:

df = spark.createDataFrame([('1111','1010', 'aaaa'), ('2222','2020', 'bbbb'), ('3333','3030', 'cccc')], ['company_id', 'client_id', 'partner_id'])

df = df.withColumn('relation', [df['company_id'], df['client_id'], df['partner_id']])
...