применить udf к нескольким столбцам и использовать пустые операции - PullRequest
0 голосов
/ 30 сентября 2019

У меня есть датафрейм с именем result в pyspark, и я хочу применить udf для создания нового столбца, как показано ниже:

result = sqlContext.createDataFrame([(138,5,10), (128,4,10), (112,3,10), (120,3,10), (189,1,10)]).withColumnRenamed("_1","count").withColumnRenamed("_2","df").withColumnRenamed("_3","docs")
@udf("float")
def newFunction(arr):
    return (1 + np.log(arr[0])) * np.log(arr[2]/arr[1])

result=result.withColumn("new_function_result",newFunction_udf(array("count","df","docs")))

количество столбцов, df, docs - все целочисленные столбцы. Но это возвращает

Py4JError: Произошла ошибка при вызове z: org.apache.spark.sql.functions.col. Трассировка: py4j.Py4JException: метод col ([класс java.util.ArrayList]) не существует в py4j.reflection.ReflectionEngine.getMethod (ReflectionEngine.java:318) в py4j.reflection.ReflectionEngine.getMethod (ReflectionEngine.jpg). ) в py4j.Gateway.invoke (Gateway.java:274) в py4j.commands.AbstractCommand.invokeMethod (AbstractCommand.java:132) в py4j.commands.CallCommand.execute (CallCommand.java:79) в py4j.GatewayConnection.un(GatewayConnection.java:214) на java.lang.Thread.run (Thread.java:748)

Когда я пытаюсь передать один столбец и получить квадраты из них, он работает нормально.

Любая помощь приветствуется.

1 Ответ

1 голос
/ 01 октября 2019

Сообщение об ошибке вводит в заблуждение, но пытается сказать вам, что ваша функция не возвращает float. Ваша функция возвращает значение типа numpy.float64, которое вы можете получить с типом VectorUDT (функция: newFunctionVector в примере ниже). Еще один способ использования numpy - приведение типа numpy numpy.float64 к типу с плавающей точкой python (функция: newFunctionWithArray в приведенном ниже примере).

И последнее, но не менее важное: вызывать его необязательно массив , поскольку udfs может использовать более одного параметра (функция: newFunction в приведенном ниже примере).

import numpy as np
from pyspark.sql.functions import udf, array
from pyspark.sql.types import FloatType
from pyspark.mllib.linalg import Vectors, VectorUDT

result = sqlContext.createDataFrame([(138,5,10), (128,4,10), (112,3,10), (120,3,10), (189,1,10)], ["count","df","docs"])

def newFunctionVector(arr):
    return (1 + np.log(arr[0])) * np.log(arr[2]/arr[1])

@udf("float")
def newFunctionWithArray(arr):
    returnValue = (1 + np.log(arr[0])) * np.log(arr[2]/arr[1])
    return returnValue.item()

@udf("float")
def newFunction(count, df, docs):
    returnValue = (1 + np.log(count)) * np.log(docs/df)
    return returnValue.item()


vector_udf = udf(newFunctionVector, VectorUDT())

result=result.withColumn("new_function_result", newFunction("count","df","docs"))

result=result.withColumn("new_function_result_WithArray", newFunctionWithArray(array("count","df","docs")))

result=result.withColumn("new_function_result_Vector", newFunctionWithArray(array("count","df","docs")))

result.printSchema()

result.show()

Вывод:

root 
|-- count: long (nullable = true) 
|-- df: long (nullable = true) 
|-- docs: long (nullable = true) 
|-- new_function_result: float (nullable = true) 
|-- new_function_result_WithArray: float (nullable = true) 
|-- new_function_result_Vector: float (nullable = true)

+-----+---+----+-------------------+-----------------------------+--------------------------+ 
|count| df|docs|new_function_result|new_function_result_WithArray|new_function_result_Vector|
+-----+---+----+-------------------+-----------------------------+--------------------------+ 
|  138|  5|  10|           4.108459|                     4.108459|                  4.108459| 
|  128|  4|  10|           5.362161|                     5.362161|                  5.362161|
|  112|  3|  10|          6.8849173|                    6.8849173|                 6.8849173|
|  120|  3|  10|           6.967983|                     6.967983|                  6.967983|
|  189|  1|  10|          14.372153|                    14.372153|                 14.372153|  
+-----+---+----+-------------------+-----------------------------+--------------------------+
...