Вычтите два массива, чтобы получить новый массив в Pyspark - PullRequest
0 голосов
/ 24 апреля 2019

Я новичок в Spark. Я могу суммировать, вычитать или умножать массивы в Python Pandas & Numpy. Но мне трудно делать что-то подобное в Spark (python). Я на Databricks.

Например, такой подход дает огромное сообщение об ошибке, которое я не хочу копировать и вставлять здесь:

differencer=udf(lambda x,y: x-y, ArrayType(FloatType()))

df.withColumn('difference', differencer('Array1', 'Array2'))

Схема выглядит так:

root
 |-- col1: integer (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- num: integer (nullable = true)
 |-- part: integer (nullable = true)
 |-- result: integer (nullable = true)
 |-- Array1: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- Array2: array (nullable = false)
 |    |-- element: float (containsNull = true)

Я просто хочу создать новый столбец, вычитая эти 2 столбца массива. На самом деле, я получу RMSE между ними. Но я думаю, что справлюсь с этим, когда научусь понимать эту разницу.

Массивы выглядят так (я просто набираю несколько целых чисел):

Array1_row1[5, 4, 2, 4, 3] Array2_row1[4, 3, 1, 2, 1]

Таким образом, результирующий массив для row1 должен быть: DiffCol_row1[1, 1, 1, 2, 2]

Спасибо за предложения или указания. Спасибо.

1 Ответ

2 голосов
/ 24 апреля 2019

Можно zip_arrays и transform

from pyspark.sql.functions import expr

df = spark.createDataFrame(
    [([5, 4, 2, 4, 3], [4, 3, 1, 2, 1])], ("array1", "array2")
) 

df.withColumn(
    "array3", 
    expr("transform(arrays_zip(array1, array2), x -> x.array1 - x.array2)")
).show()                                                                         
# +---------------+---------------+---------------+       
# |         array1|         array2|         array3|
# +---------------+---------------+---------------+
# |[5, 4, 2, 4, 3]|[4, 3, 1, 2, 1]|[1, 1, 1, 2, 2]|
# +---------------+---------------+---------------+

Для действительного udf потребуется эквивалентная логика, т.е.

from pyspark.sql.functions import udf

@udf("array<double>")
def differencer(xs, ys):
    if xs and ys:
        return [float(x - y) for x, y in zip(xs, ys)]

df.withColumn("array3", differencer("array1", "array2")).show()
# +---------------+---------------+--------------------+
# |         array1|         array2|              array3|
# +---------------+---------------+--------------------+
# |[5, 4, 2, 4, 3]|[4, 3, 1, 2, 1]|[1.0, 1.0, 1.0, 2...|
# +---------------+---------------+--------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...