pyspark hive_table ошибка манипуляции с данными - PullRequest
0 голосов
/ 25 октября 2019

Я новичок в спарке и пытаюсь поэкспериментировать с MinMaxScaler.
Я работаю со Spark 2.1.1 и пишу в Jupyter
Итак, мои шаги:
1.

from pyspark.sql import SQLContext
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors 
batch1 = sqlContext.sql("SELECT field1 FROM hive_table limit 10000")

Как указано в документации для использования MinMaxScalerмое поле должно быть Vectors.dense. Вот что я делаю дальше.

batch2 = batch1.rdd.map(lambda row: Vectors.dense(row.field1))

После этого шага я ничего не могу сделать с batch2. Например, если я

for record in batch2.collect():
    print(record)

получаю такую ​​ошибку

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 30 in stage 25.0 failed 1 times, most recent failure: Lost task 30.0 in stage 25.0 (TID 389, localhost, executor driver): java.lang.ClassCastException

Что я делаю не так?

ОБНОВЛЕНИЕ
Вот вывод для batch1.printSchema()

root
 |-- field1: integer (nullable = true)

1 Ответ

0 голосов
/ 25 октября 2019

Возможно, ваш столбец row.field1 не является массивом, [править] , однако Vectors.dense работает с отдельными значениями в качестве входных данных, поэтому я предполагаю, что ваш столбец field1 содержит несколько нечисловых записей, вы можетепроверьте его, используя batch2.select(f.col('field1').cast("int")).filter(f.col('field1').isNull()).count()

, см. пример ниже:

df = spark.createDataFrame([(123, 25), (23, 22), (2, 20)], ['c1', 'c2'])
df.show()
df.printSchema()

import pyspark.sql.functions as f

df2 = df.select(f.array('*')).toDF('arr')
df2.show()

from pyspark.ml.linalg import Vectors

rdd = df2.rdd.map(lambda x: Vectors.dense(x.arr))

for record in rdd.collect():
    print(record)  
    +---+---+
    | c1| c2|
    +---+---+
    |123| 25|
    | 23| 22|
    |  2| 20|
    +---+---+

    root
     |-- c1: long (nullable = true)
     |-- c2: long (nullable = true)

    +---------+
    |      arr|
    +---------+
    |[123, 25]|
    | [23, 22]|
    |  [2, 20]|
    +---------+

    [123.0,25.0]
    [23.0,22.0]
    [2.0,20.0]

и с не числовым значением:

spark.createDataFrame([("a123", 25), ("23", 22), ("2", 20)], ['c1', 'c2'])\
.rdd.map(lambda x: Vectors.dense(x.c1)).first()
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 169.0 failed 1 times, most recent failure: Lost task 1.0 in stage 169.0 (TID 485, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
...