чтение csv из pyspark с указанием неверных типов схемы - PullRequest
0 голосов
/ 01 декабря 2018

Я пытаюсь вывести csv из pyspark df и затем повторно вводить его, но когда я задаю схему, для столбца, являющегося массивом, он говорит, что некоторые строки False.

Вот мой df

   avg(rating)  belongs_to_collection    budget  \
0     2.909946                  False   5000000   
1     3.291962                  False  18000000   
2     3.239811                  False   8000000   
3     3.573318                  False   1500000   
4     3.516590                  False  40000000   

                                      genres original_language  
0                       ['Drama', 'Romance']                en  
1                                 ['Comedy']                en  
2                        ['Drama', 'Family']                en  
3  ['Crime', 'Drama', 'Mystery', 'Thriller']                en  
4             ['Crime', 'Drama', 'Thriller']                en  

Я впервые вывел на csv: df.drop('id').toPandas().to_csv('mergedDf.csv',index=False)

Я попытался прочитать, используя df = spark.read.csv('mergedDf.csv',schema=schema), но я получаю эту ошибку: 'CSV data source does not support array<string> data type.;'

Итак, я попытался прочитать из pandas и затем преобразовать в spark df, но он говорит мне, что столбец, содержащий список, имеет логическое значение.

df = pd.read_csv('mergedDf.csv')
df = spark.createDataFrame(df,schema=schema)
TypeError: field genres: ArrayType(StringType,true) can not accept object False in type <class 'bool'>

Однако, когда я проверяю, есть ли некоторыеиз строк == в False, я считаю, что ни один из них не является.

Я проверил: df[df['genres']=="False"] и df[df['genres']==False]

1 Ответ

0 голосов
/ 01 декабря 2018

К сожалению, функция spark read csv пока не поддерживает сложные типы данных, такие как «массив».Вы бы обработали логику приведения столбца строки в столбец массива

Используйте pandas для записи фрейма данных spark в виде csv с заголовком.

df.drop('id').toPandas().to_csv('mergedDf.csv',index=False,header=True)
df1 = spark.read.option('header','true').option("inferSchema","true").csv('mergedDf.csv')
df1.printSchema()
df1.show(10,False)

Когда вы читаете csv с помощью spark, столбец массива будет преобразован в строковый тип

root
 |-- avg(rating): double (nullable = true)
 |-- belongs_to_collection: boolean (nullable = true)
 |-- budget: integer (nullable = true)
 |-- genres: string (nullable = true)
 |-- original_language: string (nullable = true)

+-----------+---------------------+--------+-----------------------------------------+-----------------+
|avg(rating)|belongs_to_collection|budget  |genres                                   |original_language|
+-----------+---------------------+--------+-----------------------------------------+-----------------+
|2.909946   |false                |5000000 |['Drama', 'Romance']                     |en               |
|3.291962   |false                |18000000|['Comedy']                               |en               |
|3.239811   |false                |8000000 |['Drama', 'Family']                      |en               |
|3.573318   |false                |1500000 |['Crime', 'Drama', 'Mystery', 'Thriller']|en               |
|3.51659    |false                |40000000|['Crime', 'Drama', 'Thriller']           |en               |
+-----------+---------------------+--------+-----------------------------------------+-----------------+

Разделите столбец строки, чтобы создать массив, чтобы получить исходный формат.

df2 = df1.withColumn('genres',split(regexp_replace(col('genres'), '\[|\]',''),',').cast('array<string>'))
df2.printSchema()

.

root
 |-- avg(rating): double (nullable = true)
 |-- belongs_to_collection: boolean (nullable = true)
 |-- budget: integer (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- original_language: string (nullable = true)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...