PySpark: конвертировать RDD в столбец в кадре данных - PullRequest
0 голосов
/ 15 мая 2018

У меня есть искровой фрейм данных, с помощью которого я вычисляю евклидово расстояние между строкой и заданным набором кординатов. Я воссоздаю структурно подобный фрейм данных 'df_vector', чтобы объяснить лучше.

from pyspark.ml.feature import VectorAssembler
arr = [[1,2,3], [4,5,6]]
df_example = spark.createDataFrame(arr, ['A','B','C'])
assembler = VectorAssembler(inputCols=[x for x in df_example.columns],outputCol='features')
df_vector = assembler.transform(df_example).select('features') 

>>> df_vector.show()
+-------------+
|     features|
+-------------+
|[1.0,2.0,3.0]|
|[4.0,5.0,6.0]|
+-------------+

>>> df_vector.dtypes
[('features', 'vector')]

Как видите, столбец features представляет собой вектор. На практике я получаю этот векторный столбец как результат StandardScaler. В любом случае, поскольку мне нужно вычислить евклидово расстояние, я делаю следующее

rdd = df_vector.select('features').rdd.map(lambda r: np.linalg.norm(r-b))

, где

b = np.asarray([0.5,1.0,1.5])

У меня есть все необходимые вычисления, но мне нужен rdd в виде столбца в df_vector. Как мне это сделать?

Ответы [ 2 ]

0 голосов
/ 15 мая 2018

Одним из способов решения проблем производительности может быть использование mapPartitions.Идея заключалась бы в том, чтобы на уровне разделов преобразовать features в массив, а затем вычислить норму для всего массива (таким образом, неявно используя пустую векторизацию).Затем сделайте уборку, чтобы получить желаемую форму.Для больших наборов данных это может повысить производительность:

Вот функция, которая вычисляет норму на уровне раздела:

from pyspark.sql import Row
def getnorm(vectors):
    # convert vectors into numpy array
    vec_array=np.vstack([v['features'] for v in vectors])
    # calculate the norm
    norm=np.linalg.norm(vec_array-b, axis=1)
    # tidy up to get norm as a column
    output=[Row(features=x, norm=y) for x,y in zip(vec_array.tolist(), norm.tolist())]
    return(output)

Применение этого с использованием mapPartitions дает СДР строк, которые затем могут бытьпреобразован в DataFrame:

df_vector.rdd.mapPartitions(getnorm).toDF()
0 голосов
/ 15 мая 2018

Вместо создания нового rdd вы можете использовать UDF:

norm_udf = udf(lambda r: np.linalg.norm(r - b).tolist(), FloatType())
df_vector.withColumn("norm", norm_udf(df.features))

Убедитесь, что на рабочих узлах определено numpy.

...