Найти среднее значение массива pyspark <double> - PullRequest
7 голосов
/ 03 апреля 2019

В pyspark у меня есть массив переменной длины, для которого я хотел бы найти среднее значение.Однако для функции усреднения требуется один числовой тип.

Есть ли способ найти среднее значение массива, не разбирая массив?У меня есть несколько разных массивов, и я хотел бы иметь возможность сделать что-то вроде следующего:

df.select(col("Segment.Points.trajectory_points.longitude"))

DataFrame [долгота: массив]

df.select(avg(col("Segment.Points.trajectory_points.longitude"))).show()
org.apache.spark.sql.AnalysisException: cannot resolve
'avg(Segment.Points.trajectory_points.longitude)' due to data type
mismatch: function average requires numeric types, not
ArrayType(DoubleType,true);;

Если у меня есть 3 уникальные записи со следующими массивами, я хотел бы получить среднее значение этих значений в качестве выходных данных.Это будет 3 средних значения долготы.

Ввод:

[Row(longitude=[-80.9, -82.9]),
 Row(longitude=[-82.92, -82.93, -82.94, -82.96, -82.92, -82.92]),
 Row(longitude=[-82.93, -82.93])]

Выход:

-81.9,
-82.931,
-82.93

Я использую версию 2.1.3.


Разобрать решение:

Итак, у меня все получилось, взорвавшись, но я надеялся избежать этого шага.Вот что я сделал

from pyspark.sql.functions import col
import pyspark.sql.functions as F

longitude_exp = df.select(
    col("ID"), 
    F.posexplode("Segment.Points.trajectory_points.longitude").alias("pos", "longitude")
)

longitude_reduced = long_exp.groupBy("ID").agg(avg("longitude"))

Это успешно взяло среднее значение.Однако, поскольку я буду делать это для нескольких столбцов, мне придется взорвать один и тот же DF несколько раз.Я буду продолжать работать над этим, чтобы найти более чистый способ сделать это.

Ответы [ 2 ]

3 голосов
/ 03 апреля 2019

В вашем случае ваши варианты использования explode или udf. Как вы заметили, explode неоправданно дорого. Таким образом, udf - это путь.

Вы можете написать свою собственную функцию, чтобы взять среднее из списка чисел, или просто отбросить numpy.mean. Если вы используете numpy.mean, вам придется привести результат к float (потому что искра не знает, как обрабатывать numpy.float64 с).

import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

array_mean = udf(lambda x: float(np.mean(x)), FloatType())
df.select(array_mean("longitude").alias("avg")).show()
#+---------+
#|      avg|
#+---------+
#|    -81.9|
#|-82.93166|
#|   -82.93|
#+---------+
2 голосов
/ 22 июня 2019

В последних версиях Spark (2.4 или более поздней) наиболее эффективным решением является использование aggregate функции более высокого порядка:

from pyspark.sql.functions import expr

query = """aggregate(
    `{col}`,
    CAST(0.0 AS double),
    (acc, x) -> acc + x,
    acc -> acc / size(`{col}`)
) AS  `avg_{col}`""".format(col="longitude")

df.selectExpr("*", query).show()
+--------------------+------------------+
|           longitude|     avg_longitude|
+--------------------+------------------+
|      [-80.9, -82.9]|             -81.9|
|[-82.92, -82.93, ...|-82.93166666666667|
|    [-82.93, -82.93]|            -82.93|
+--------------------+------------------+

См. Также Среднечисленное вычисление Spark Scala с обработкой нуля

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...