pyspark - совокупный (сумма) вектор поэлементно - PullRequest
0 голосов
/ 24 января 2019

У меня есть то, что кажется простой проблемой, но я продолжаю биться головой об стену, но безуспешно.По сути, я пытаюсь сделать то же самое, что и в этом сообщении , за исключением того, что мне не важен аспект "группы по" этого сообщения, я просто хочу суммировать по всем строкам .

Чтобы перефразировать связанный пост, DataFrame выглядит следующим образом:

ID,Vec
1,[0,0,5]
2,[3,3,4]
3,[0,8,1]
....

Я хотел бы поэлементно суммировать векторы.

Требуемый результат вышеприведенногоПримером может служить одна строка:

SumOfVectors
[3,11,10]

Другая большая разница в том, что Я использую pyspark , а не Scala.Я попытался заставить rdd.fold() работать, но либо он не работает так же, либо я не могу понять синтаксис в pyspark.

Последнее замечание: я делаю это на кадре данныхСтроки ~ 1MM и вектор длиной ~ 10k, так что это должно быть достаточно эффективно.

Спасибо за любую помощь!Воспроизводимый игрушечный фрейм данных ниже, согласно комментариям.

import numpy as np
from pyspark.ml.linalg import Vectors

n_rows = 100

pdf = np.concatenate([np.array(range(n_rows)), np.random.randn(n_rows), 3*np.random.randn(n_rows)+2, 6*np.random.randn(n_rows)-2]).reshape(n_rows,-1)
dff = map(lambda x: (int(x[0]), Vectors.dense(x[1:])), pdf)

df = spark.createDataFrame(dff,schema=["ID", "Vec"])

df.schema должен выглядеть как StructType(List(StructField(ID,LongType,true),StructField(Vec,VectorUDT,true)))

просто печать df дает мне DataFrame[ID: bigint, Vec: vector]

Также возможно значение, Я на Spark 2.4

$ spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_191
Branch HEAD
Compiled by user ec2-user on 2018-12-07T19:51:27Z
Revision bab859f34a291cb7b3f4e724b59e1b48af69016b
Url git@aws157git.com:/pkg/Aws157BigTop
Type --help for more information.

Ответы [ 2 ]

0 голосов
/ 25 февраля 2019

Я в конце концов понял это (я вру, один из моих коллег понял это для меня), поэтому я опубликую ответ здесь, если у кого-то возникнет такая же проблема.

Вы можете использовать fold аналогично тому, как это делается в примере с scala, связанным с исходным вопросом.Синтаксис в pyspark выглядит так:

# find out how many Xs we're iterating over to establish the range below
vec_df = df.select('Vec')
num_cols = len(vec_df.first().Vec)

# iterate over vector to sum each "column"    
vec_sums = vec_df.rdd.fold([0]*num_cols, lambda a,b: [x + y for x, y in zip(a, b)])

Краткое объяснение: rdd.fold() принимает два аргумента.Первым является массив инициализации, в данном случае [0]*num_cols, который является просто массивом 0.Вторая - это функция, которая применяется к массиву и используется для итерации по каждой строке кадра данных.Так что для каждой строки он делает lambda a,b: [x + y for x, y in zip(a, b)], что просто добавляет эту строку поэлементно к тому, что он вычислил до сих пор.

Вы можете использовать мой код в исходном вопросе, чтобы сгенерировать игрушечный фрейм данных, чтобы проверить это.Надеюсь, это кому-нибудь пригодится.

0 голосов
/ 27 января 2019

Я думаю, вам нужно привести векторный столбец к массиву, прежде чем вы сможете его агрегировать.

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import functions as F
from pyspark.sql import types as T

def vec2array(v):
  v = Vectors.dense(v)
  array = list([float(x) for x in v])
  return array

vec2array_udf = F.udf(vec2array, T.ArrayType(T.FloatType()))

df = df.withColumn('Vec', vec2array_udf('Vec'))

n = len(df.select('Vec').first()[0])
bla = df.agg(F.array(*[F.sum(F.col("Vec")[i]) for i in range(n)]).alias("sum"))
bla.show(truncate=False)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...