PySpark: как указать столбец с запятой в качестве десятичной - PullRequest
0 голосов
/ 08 октября 2018

Я работаю с PySpark и загружаю файл csv.У меня есть столбец с числами в европейском формате, что означает, что точка заменяет запятую, и наоборот.

Например: у меня есть 2.416,67 вместо 2,416.67.

My data in .csv file looks like this -    
ID;    Revenue
21;    2.645,45
23;   31.147,05
.
.
55;    1.009,11

В пандах такой файл легко прочитать, указав параметры decimal=',' и thousands='.' внутриpd.read_csv() для чтения европейских форматов.

Код панд:

import pandas as pd
df=pd.read_csv("filepath/revenues.csv",sep=';',decimal=',',thousands='.')

Я не знаю, как это можно сделать в PySpark.

PySpark код:

from pyspark.sql.types import StructType, StructField, FloatType, StringType
schema = StructType([
            StructField("ID", StringType(), True),
            StructField("Revenue", FloatType(), True)
                    ])
df=spark.read.csv("filepath/revenues.csv",sep=';',encoding='UTF-8', schema=schema, header=True)

Кто-нибудь может подсказать, как мы можем загрузить такой файл в PySpark, используя вышеупомянутую функцию .csv()?

Ответы [ 2 ]

0 голосов
/ 08 февраля 2019

Убедитесь, что ваша таблица SQL предварительно отформатирована, чтобы читать NUMERIC вместо INTEGER. У меня были большие проблемы, когда я пытался выяснить все о кодировании и различных форматах точек и запятых и т. Д. И вв конце проблема была гораздо более примитивной, она была предварительно отформатирована для чтения только чисел INTEGER, и, следовательно, никакие десятичные дроби никогда не будут приняты, независимо от запятых или точек.Затем мне просто пришлось изменить таблицу SQL, чтобы вместо нее принимать действительные числа (NUMERIC), и все.

0 голосов
/ 09 октября 2018

Вы не сможете прочитать его как число с плавающей запятой из-за формата данных.Вам нужно прочитать это как строку, очистить и затем привести к плавающему:

from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import FloatType

df = spark.read.option("headers", "true").option("inferSchema", "true").csv("my_csv.csv", sep=";")
df = df.withColumn('revenue', regexp_replace('revenue', '\\.', ''))
df = df.withColumn('revenue', regexp_replace('revenue', ',', '.'))
df = df.withColumn('revenue', df['revenue'].cast("float"))

Вы, вероятно, можете просто объединить их все вместе:

df = spark.read.option("headers", "true").option("inferSchema", "true").csv("my_csv.csv", sep=";")
df = (
         df
         .withColumn('revenue', regexp_replace('revenue', '\\.', ''))
         .withColumn('revenue', regexp_replace('revenue', ',', '.'))
         .withColumn('revenue', df['revenue'].cast("float"))
     )

Обратите внимание, это яЯ не проверял это, так что там может быть одна или две опечатки.

...