PySpark делит массив данных на float - PullRequest
1 голос
/ 28 апреля 2019

У меня есть датафрейм dfDistance. Образец:

    DataIndex CenterIndex   distances           array
        65        0          115.63     [115.63,115.01,114.14]
        65        1          115.01     [115.63,115.01,114.14]
        65        2          114.14     [115.63,115.01,114.14]

Я хочу создать новый столбец, равный поэлементному делению значений в array на соответствующее значение в distances. Я пробовал следующее:

temp = dfDistance.select("DataIndex",   "CenterIndex", "distances", (np.divide(dfDistance.array, dfDistance.distances)))

Это дало мне эту ошибку:

"cannot resolve '(`array` / `distances`)' due to data type mismatch: differing types in '(`array` / `distances`)' (array<float> and float).

Однако, когда я запускаю это:

    a = [115.63,115.01,114.14]
    b= 115.63
    print(np.divide([115.63,115.01,114.14], 115.63))

Это работает и дает мне такой результат: [ 1. 0.99463807 0.98711407]. Почему это не работает в случае PySpark, и как мне изменить мой код, чтобы он работал?

1 Ответ

1 голос
/ 28 апреля 2019

Причина, по которой он работает снаружи, заключается в том, что вы работаете с нативными типами Python (list и float).С другой стороны, в PySpark вы работаете с объектами столбцов , которые не работают одинаково.

В любом случае, самый простой способ сделать это, я думаю, будетбыть с UDF.Я попытался просмотреть документацию PySpark, но не смог найти странного способа действовать с массивом напрямую.Пример:

from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, DoubleType

def normalise(a, dist):
    return [element / dist for element in a]

dfDistance.withColumn('normalised', F.udf(normalise, ArrayType(DoubleType()))(df['array'], df['distances']))

С другой стороны, если вы хотите нормализованную сумму, вы можете использовать explode:

distance_sum = dfDistance.select('array', F.explode('array')).groupby('array').sum()

dfDistance.join(distance_sum, on='array', how='left').withColumn('normalised_sum', F.col('sum(col)') / F.col('distances')).drop('sum(col)')
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...