применить np.sign к серии pyspark, не работающей, даже используя udf - PullRequest
0 голосов
/ 18 января 2020

В настоящее время я пытаюсь преобразовать все значения строки в определенный знак, используя numpy встроенную функцию np.sign

Мой код:

import numpy as np
pd_dataframe = pd.DataFrame({'id': [i for i in range(10)],
                             'values': [10,5,3,-1,0,-10,-4,10,0,10]})

sp_dataframe = spark.createDataFrame(pd_dataframe)
sign_acc_row = F.udf(lambda x: np.sign([x]), IntegerType())
sp_dataframe = sp_dataframe.withColumn('sign', sign_acc_row('values'))
sp_dataframe.show()

Ошибки:

y4JJavaError: An error occurred while calling o2586.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 320.0 failed 1 times, most recent failure: Lost task 0.0 in stage 320.0 (TID 3199, localhost, executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.core.multiarray._reconstruct)

Ожидаемый результат:

    id  values  sign
0   0   10  1
1   1   5   1
2   2   3   1
3   3   -1  -1
4   4   0   0
5   5   -10 -1
6   6   -4  -1
7   7   10  1
8   8   0   0
9   9   10  1

Дополнительный вопрос, если разрешено:

Я хотел создать еще один столбец, для которого он возвращает дополнительно 1, если значение отличается от предыдущая строка.

Ожидаемый результат:

    id  values  sign    numbering
0   0   10  1   1
1   1   5   1   1
2   2   3   1   1
3   3   -1  -1  2
4   4   0   0   3
5   5   -10 -1  4
6   6   -4  -1  4
7   7   10  1   5
8   8   0   0   6
9   9   10  1   7

1 Ответ

1 голос
/ 18 января 2020

Вы почти у цели. np.sign возвращает объект numpy.int64, который не понят pyspark. Чтобы сделать их совместимыми, вы можете сделать:

sign_acc_row = F.udf(lambda x: int(np.sign(x)), IntegerType())
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...