Создание массива индикаторов на основе значений столбцов другого фрейма данных в PySpark - PullRequest
0 голосов
/ 22 октября 2018

У меня есть два фрейма данных: df1

+---+-----------------+
|id1|           items1|
+---+-----------------+
|  0|     [B, C, D, E]|
|  1|        [E, A, C]|
|  2|     [F, A, E, B]|
|  3|        [E, G, A]|
|  4|  [A, C, E, B, D]|
+---+-----------------+ 

и df2:

+---+-----------------+
|id2|           items2|
+---+-----------------+
|001|           [A, C]|
|002|              [D]|
|003|        [E, A, B]|
|004|        [B, D, C]|
|005|           [F, B]|
|006|           [G, E]|
+---+-----------------+ 

Я хотел бы создать вектор индикатора (в новом столбце result_array в df1) на основе значений в items2.Вектор должен иметь ту же длину, что и число строк в df2 (в этом примере он должен иметь 6 элементов).Его элементы должны иметь либо значение 1,0, если строка в items1 содержит все элементы в соответствующей строке items2, либо значение 0,0 в противном случае.Результат должен выглядеть следующим образом:

+---+-----------------+-------------------------+
|id1|           items1|             result_array|
+---+-----------------+-------------------------+
|  0|     [B, C, D, E]|[0.0,1.0,0.0,1.0,0.0,0.0]|
|  1|        [E, A, C]|[1.0,0.0,0.0,0.0,0.0,0.0]|
|  2|     [F, A, E, B]|[0.0,0.0,1.0,0.0,1.0,0.0]|
|  3|        [E, G, A]|[0.0,0.0,0.0,0.0,0.0,1.0]|
|  4|  [A, C, E, B, D]|[1.0,1.0,1.0,1.0,0.0,0.0]|
+---+-----------------+-------------------------+

Например, в строке 0 второе значение равно 1,0, поскольку [D] является подмножеством [B, C, D, E], а четвертое значение -1.0, потому что [B, D, C] является подмножеством [B, C, D, E].Все остальные группы элементов в df2 не являются подмножествами [B, C, D, E], поэтому их значения индикатора равны 0,0.

Я пытался создать список всех групп элементов в items2 с использованием collect () и затем применил udf, но мои данные слишком велики (более 10 миллионов строк).

1 Ответ

0 голосов
/ 23 октября 2018

Вы можете действовать следующим образом,

import pyspark.sql.functions as F
from pyspark.sql.types import *

df1 = sql.createDataFrame([
     (0,['B', 'C', 'D', 'E']),
     (1,['E', 'A', 'C']),
     (2,['F', 'A', 'E', 'B']),
     (3,['E', 'G', 'A']),
     (4,['A', 'C', 'E', 'B', 'D'])],
   ['id1','items1'])

df2 = sql.createDataFrame([
     (001,['A', 'C']),
     (002,['D']),
     (003,['E', 'A', 'B']),
     (004,['B', 'D', 'C']),
     (005,['F', 'B']),
     (006,['G', 'E'])],
    ['id2','items2'])

, который дает вам кадры данных,

+---+---------------+
|id1|         items1|
+---+---------------+
|  0|   [B, C, D, E]|
|  1|      [E, A, C]|
|  2|   [F, A, E, B]|
|  3|      [E, G, A]|
|  4|[A, C, E, B, D]|
+---+---------------+

+---+---------+
|id2|   items2|
+---+---------+
|  1|   [A, C]|
|  2|      [D]|
|  3|[E, A, B]|
|  4|[B, D, C]|
|  5|   [F, B]|
|  6|   [G, E]|
+---+---------+

Теперь, crossJoin два кадра данных, которые дают вам декартово произведение df1 с df2.Затем groupby на 'items1' и примените udf, чтобы получить 'result_array'.

get_array_udf = F.udf(lambda x,y:[1.0 if set(z) < set(x) else 0.0 for z in y], ArrayType(FloatType()))

df = df1.crossJoin(df2)\
        .groupby(['id1', 'items1']).agg(F.collect_list('items2').alias('items2'))\
        .withColumn('result_array', get_array_udf('items1', 'items2')).drop('items2')

df.show()

. Это даст вам вывод:

+---+---------------+------------------------------+                            
|id1|items1         |result_array                  |
+---+---------------+------------------------------+
|1  |[E, A, C]      |[1.0, 0.0, 0.0, 0.0, 0.0, 0.0]|
|0  |[B, C, D, E]   |[0.0, 1.0, 0.0, 1.0, 0.0, 0.0]|
|4  |[A, C, E, B, D]|[1.0, 1.0, 1.0, 1.0, 0.0, 0.0]|
|3  |[E, G, A]      |[0.0, 0.0, 0.0, 0.0, 0.0, 1.0]|
|2  |[F, A, E, B]   |[0.0, 0.0, 1.0, 0.0, 1.0, 0.0]|
+---+---------------+------------------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...