Возможно, ваш столбец row.field1
не является массивом, [править] , однако Vectors.dense работает с отдельными значениями в качестве входных данных, поэтому я предполагаю, что ваш столбец field1
содержит несколько нечисловых записей, вы можетепроверьте его, используя batch2.select(f.col('field1').cast("int")).filter(f.col('field1').isNull()).count()
, см. пример ниже:
df = spark.createDataFrame([(123, 25), (23, 22), (2, 20)], ['c1', 'c2'])
df.show()
df.printSchema()
import pyspark.sql.functions as f
df2 = df.select(f.array('*')).toDF('arr')
df2.show()
from pyspark.ml.linalg import Vectors
rdd = df2.rdd.map(lambda x: Vectors.dense(x.arr))
for record in rdd.collect():
print(record)
+---+---+
| c1| c2|
+---+---+
|123| 25|
| 23| 22|
| 2| 20|
+---+---+
root
|-- c1: long (nullable = true)
|-- c2: long (nullable = true)
+---------+
| arr|
+---------+
|[123, 25]|
| [23, 22]|
| [2, 20]|
+---------+
[123.0,25.0]
[23.0,22.0]
[2.0,20.0]
и с не числовым значением:
spark.createDataFrame([("a123", 25), ("23", 22), ("2", 20)], ['c1', 'c2'])\
.rdd.map(lambda x: Vectors.dense(x.c1)).first()
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 169.0 failed 1 times, most recent failure: Lost task 1.0 in stage 169.0 (TID 485, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):