pyspark-java.lang.IllegalStateException: у входной строки нет ожидаемого числа значений, требуемых схемой - PullRequest
0 голосов
/ 09 сентября 2018

я запускаю pyspark-sql код в песочнице Horton

18/08/11 17:02:22 ИНФОРМАЦИЯ spark.SparkContext: Running Spark версия 1.6.3

# code 
from pyspark.sql import *
from pyspark.sql.types import *
rdd1 = sc.textFile ("/user/maria_dev/spark_data/products.csv")
rdd2 = rdd1.map( lambda x : x.split("," ) )
df1 = sqlContext.createDataFrame(rdd2, ["id","cat_id","name","desc","price", "url"])
df1.printSchema()

root
 |-- id: string (nullable = true)
 |-- cat_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- price: string (nullable = true)
 |-- url: string (nullable = true)

df1.show() 
+---+------+--------------------+----+------+--------------------+
| id|cat_id|                name|desc| price|                 url|
+---+------+--------------------+----+------+--------------------+
|  1|     2|Quest Q64 10 FT. ...|    | 59.98|http://images.acm...|
|  2|     2|Under Armour Men'...|    |129.99|http://images.acm...|
|  3|     2|Under Armour Men'...|    | 89.99|http://images.acm...|
|  4|     2|Under Armour Men'...|    | 89.99|http://images.acm...|
|  5|     2|Riddell Youth Rev...|    |199.99|http://images.acm...|

# When I try to get counts I get the following error.
df1.count()

**Caused by: java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 6 fields are required while 7 values are provided.**

# I get the same error for the following code as well
df1.registerTempTable("products_tab")
df_query = sqlContext.sql ("select id, name, desc from products_tab order by name, id ").show();

Я вижу, что столбец desc имеет значение null, но не уверен, нужно ли обрабатывать пустой столбец по-разному при создании фрейма данных и использовании любого метода для него.

Та же ошибка возникает при выполнении SQL-запроса. Кажется, ошибка sql вызвана предложением order by, если я удаляю order by, тогда запрос выполняется успешно.

Пожалуйста, дайте мне знать, если вам нужна дополнительная информация и оцените ответ о том, как справиться с этой ошибкой.

Я попытался увидеть, содержит ли поле имени запятую, как это предложил Чандан Рэй. Там нет запятой в поле имени.

rdd1.count()
=> 1345
rdd2.count()
=> 1345
# clipping id and name column from rdd2
rdd_name = rdd2.map(lambda x: (x[0], x[2]) )
rdd_name.count()
=>1345
rdd_name_comma = rdd_name.filter (lambda x : True if x[1].find(",") != -1  else False )
rdd_name_comma.count()
==> 0

Ответы [ 3 ]

0 голосов
/ 11 сентября 2018

Я обнаружил проблему - это было из-за одной плохой записи, где запятая была встроена в строку. И хотя строка была заключена в двойные кавычки, python разбивает строку на 2 столбца. Я пытался использовать пакет databricks

# from command prompt
pyspark --packages com.databricks:spark-csv_2.10:1.4.0

# on pyspark 
 schema1 = StructType ([ StructField("id",IntegerType(), True), \
         StructField("cat_id",IntegerType(), True), \
         StructField("name",StringType(), True),\
         StructField("desc",StringType(), True),\
         StructField("price",DecimalType(), True), \
         StructField("url",StringType(), True)
         ])

df1 = sqlContext.read.format('com.databricks.spark.csv').schema(schema1).load('/user/maria_dev/spark_data/products.csv')
        df1.show()
df1.show()
    +---+------+--------------------+----+-----+--------------------+
    | id|cat_id|                name|desc|price|                 url|
    +---+------+--------------------+----+-----+--------------------+
    |  1|     2|Quest Q64 10 FT. ...|    |   60|http://images.acm...|
    |  2|     2|Under Armour Men'...|    |  130|http://images.acm...|
    |  3|     2|Under Armour Men'...|    |   90|http://images.acm...|
    |  4|     2|Under Armour Men'...|    |   90|http://images.acm...|
    |  5|     2|Riddell Youth Rev...|    |  200|http://images.acm...|

df1.printSchema()
    root
     |-- id: integer (nullable = true)
     |-- cat_id: integer (nullable = true)
     |-- name: string (nullable = true)
     |-- desc: string (nullable = true)
     |-- price: decimal(10,0) (nullable = true)
     |-- url: string (nullable = true)

df1.count()
     1345
0 голосов
/ 12 сентября 2018

Вот мой взгляд на очистку таких записей, мы обычно сталкиваемся с такими ситуациями:

а. Аномалия в данных, где файл при создании не просматривался, если "," является лучшим разделителем для столбцов.

Вот мое решение по делу:

Решение a: В таких случаях мы хотели бы, чтобы процесс идентифицировался как часть очистки данных, если эта запись является квалифицированной записью. Остальные записи, если они будут перенаправлены в неверный файл / коллекцию, дадут возможность согласовать такие записи.

Ниже приведена структура моего набора данных (product_id, product_name, unit_price)

1,product-1,10
2,product-2,20
3,product,3,30

В приведенном выше случае продукт 3 должен читаться как продукт 3, который мог быть опечаткой при регистрации продукта. В таком случае, приведенный ниже пример будет работать.

>>> tf=open("C:/users/ip2134/pyspark_practice/test_file.txt")
>>> trec=tf.read().splitlines()
>>> for rec in trec:
...   if rec.count(",") == 2:
...      trec_clean.append(rec)
...   else:
...      trec_bad.append(rec)
...
>>> trec_clean
['1,product-1,10', '2,product-2,20']
>>> trec_bad
['3,product,3,30']
>>> trec
['1,product-1,10', '2,product-2,20','3,product,3,30']

Другой вариант решения этой проблемы - попытаться определить, будет ли skipinitialspace = True работать для анализа столбцов.

(Ref: Python анализирует CSV, игнорируя запятую с двойными кавычками )

0 голосов
/ 09 сентября 2018

Полагаю, ваше поле имени содержит запятую, поэтому оно также разделяется. Так что ожидаемо 7 столбцов

Возможно, есть неправильные линии.

Пожалуйста, попробуйте использовать код, указанный ниже, чтобы исключить плохую запись в одном файле

val df = spark.read.format(“csv”).option("badRecordsPath", "/tmp/badRecordsPath").load(“csvpath”)

// он будет читать csv и создавать фрейм данных, если будет какая-либо искаженная запись, он переместит ее в указанный вами путь.

// пожалуйста, прочитайте ниже

https://docs.databricks.com/spark/latest/spark-sql/handling-bad-records.html

...