Я пытаюсь построить kdtree, используя pyspark.Для этого я использую UDF для рекурсивного построения kdtree из двумерного списка с плавающей точкой.
Ниже приведен фрагмент кода, который я пытаюсь выполнить:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.types import *
spark = SparkSession.builder.appName("SRDD").getOrCreate()
sc = spark.sparkContext
# Some sequence of floats
abc = [[0.0769,0.2982],[0.0863,0.30052],[0.0690,0.33337],[0.11975,0.2984],[0.07224,0.3467],[0.1316,0.2999]]
def build_kdtree(points,depth=0):
n=points.count()
if n<=0:
return None
axis=depth%2
sorted_points=sorted(points,key=lambda point:point[axis])
return{
'point': sorted_points[n/2],
'left':build_kdtree(sorted_points[:n/2],depth+1),
'right':build_kdtree(sorted_points[n/2 + 1:],depth+1)
}
#This is how I'm trying to specify the return type of the function
kdtree_schema=StructType([StructField('point',ArrayType(FloatType()),nullable=True),StructField('left',StructType(),nullable=True),StructField('right',StructType(),nullable=True)])
kdtree_schema=StructType([StructField('point',ArrayType(FloatType()),nullable=True),StructField('left',kdtree_schema,nullable=True),StructField('right',kdtree_schema,nullable=True)])
#UDF registration
buildkdtree_udf=udf(build_kdtree, kdtree_schema)
#Function call
pointskdtree=buildkdtree_udf(abc)
Однако, это вызывает TypeError: Неверный аргумент, а не строка или столбец.
У меня есть 2 основных вопроса:
- Является ли мой подход к построению дерева kd в искре рекурсивно правильным?
- Строки, в которых я указываю тип возврата UDF какkdtree_schema правильно?