Как преобразовать числа с плавающей точкой в ​​IntegerType, используя StructType в PySpark? - PullRequest
0 голосов
/ 13 октября 2019

Я пытаюсь создать фрейм данных Spark из фрейма данных Pandas, где я определяю типы данных столбца с помощью класса StructType. Я сохранил фрейм данных pandas как df, а фрейм данных spark - как данные.

Перед тем, как приступить к какому-либо из этих действий, где-то в файле csv произошла ошибка, я использовал параметр метода read_csv в pandas. , error_bad_lines. Я не знаком с искровым эквивалентом.

df = pd.read_csv('Amazon_Responded_Oct05.csv',error_bad_lines=False)
df.head()
>>>>
    user_id_str user_followers_count    text_
0   143515471.0 1503    @AmazonHelp Can you please DM me? A product I ...
1   85741735.0  149569  @SeanEPanjab I'm sorry, we're unable to DM you...
2   143515471.0 1503    @AmazonHelp It was purchased on... 
3   143515471.0 1503    @AmazonHelp I am following you now, if it help...
4   85741735.0  149569  @SeanEPanjab Please give us a call/chat so we ...

Обратите внимание, как столбец user_id_str заполняется значениями с плавающей запятой, т. Е. 143515471.0 Ниже показано, где возникает ошибка.

data_schema = [StructField('user_followers_count',IntegerType(),True),
               StructField('user_id_str',StringType(),True),
               StructField('text',StringType(),True)]
final_struc = StructType(fields=data_schema)

data = spark.createDataFrame(df,schema=final_struc)
>>>>
TypeError: field user_followers_count: IntegerType can not accept object 143515471.0 in type <class 'float'>

I 'мы пытались исправить это со стороны панд безуспешно

df.astype({'user_id_str': 'int','user_followers_count':'int','text_':'str'}).dtypes
df.head(1)
>>>>
    user_id_str user_followers_count    text_
0   143515471.0 1503    @AmazonHelp Can you please DM me? A product I ...

Итак, я применил различные подходы к своей цели, безуспешно создавая фрейм данных Spark с типами данных столбцов, IntegerType, IntegerType, StringType,Я был бы очень признателен за способ принудительного преобразования данных.

Редактировать:

Наконец, я попытался просто начать с Spark;но это также было бесполезно.


data_1 = spark.read.csv('Amazon_Responded_Oct05.csv',schema=final_struc,enforceSchema=True)
data_1.head(5)
>>>>
+--------------------+-----------+----+
|user_followers_count|user_id_str|text|
+--------------------+-----------+----+
|                null|       null|null|
|                null|       null|null|
|                null|       null|null|
|                null|       null|null|
|                null|       null|null|
+--------------------+-----------+----+
only showing top 5 rows

1 Ответ

1 голос
/ 13 октября 2019

Чтобы преобразовать данные из pandas dataframe в pyspark dataframe, попробуйте это

from pyspark.sql import Row
import pandas as pd
from pyspark.sql.types import StructField, StructType, StringType, IntegerType

#create a sample pandas dataframe
data = {'a':['hello', 'hi', 'world'], 'b':[5.0, 6.4, 9.7], 'c':[1,2,3]}
df = pd.DataFrame(data)
'''
    a       b       c
0   hello   5.0     1
1   hi      6.4     2
2   world   9.7     3
'''

#convert second column type to integer
df = df.astype({'b':'int'})
df
'''
    a       b       c
0   hello   5       1
1   hi      6       2
2   world   9       3
'''

#prepare the schema
fields = [StructField('a',StringType(),True),\
               StructField('b',IntegerType(),True),\
               StructField('c',IntegerType(),True)]
schema = StructType(fields)


#convert to a pyspark dataframe
rows = [Row(**_) for _ in df.to_dict(orient='records')]
#[Row(a='hello', b=5, c=1), Row(a='hi', b=6, c=2), Row(a='world', b=9, c=3)]
df_sp = spark.createDataFrame(rows, schema)
df_sp.show()
# +-----+---+---+
# |    a|  b|  c|
# +-----+---+---+
# |hello|  5|  1|
# |   hi|  6|  2|
# |world|  9|  3|
# +-----+---+---+

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...