Умножьте два столбца данных Pyspark разных типов (массив [двойной] против двойного) без бриза - PullRequest
0 голосов
/ 08 января 2020

У меня та же проблема, что и у здесь , но мне нужно решение в pyspark и без бриза.

Например, если мой фрейм данных pyspark выглядит следующим образом:

user    |  weight  |  vec
"u1"    | 0.1      | [2, 4, 6]
"u1"    | 0.5      | [4, 8, 12]
"u2"    | 0.5      | [20, 40, 60]

где вес столбца имеет тип double, а столбец ve c имеет тип Array [Double], я хотел бы получить взвешенную сумму векторов на пользователя, чтобы получить кадр данных, который выглядит следующим образом:

user    |  wsum
"u1"    | [2.2, 4.4, 6.6]
"u2"    | [10, 20, 30]

Для этого я попробовал следующее:

df.groupBy('user').agg((F.sum(df.vec* df.weight)).alias("wsum"))

Но это не удалось, так как столбцы ve c и весовые столбцы имеют разные типы.

Как я могу решить эту ошибку без бриза?

1 Ответ

0 голосов
/ 08 января 2020

В пути, используя функцию более высокого порядка transform, доступную из Spark 2.4:

# get size of vec array
n = df.select(size("vec")).first()[0]

# transform each element of the vec array
transform_expr = "transform(vec, x -> x * weight)"

df.withColumn("weighted_vec", expr(transform_expr)) \
  .groupBy("user").agg(array(*[sum(col("weighted_vec")[i]) for i in range(n)]).alias("wsum"))\
  .show()

Дает:

+----+------------------+
|user|              wsum|
+----+------------------+
|  u1|   [2.2, 4.4, 6.6]|
|  u2|[10.0, 20.0, 30.0]|
+----+------------------+

Для Spark <2.4, используя для понимания умножения каждого элемент в столбце <code>weight, например:

df.withColumn("weighted_vec", array(*[col("vec")[i] * col("weight") for i in range(n)])) \
  .groupBy("user").agg(array(*[sum(col("weighted_vec")[i]) for i in range(n)]).alias("wsum")) \
  .show()
...