Создание PySpark DataFrame с несколькими файлами с заголовком и без него в одном каталоге - PullRequest
0 голосов
/ 26 апреля 2018

Я пытаюсь создать фрейм данных из каталога с несколькими файлами.Среди этих файлов только один имеет заголовок.Я хочу использовать параметр схемы infer для создания схемы из заголовка.

Когда я создаю DF, используя один файл, он правильно выводит схему.

flights = spark.read.csv("/sample/flight/flight_delays1.csv",header=True,inferSchema=True)

Но,когда я читаю все файлы в каталоге, он выдает эту ошибку.

flights = spark.read.csv("/sample/flight/",header=True,inferSchema=True)

18/04/21 23:49:18 WARN SchemaUtils: Found duplicate column(s) in the data schema and the partition schema: `11`. You might need to assign different column names.

flights.take(5)

18/04/21 23:49:27 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 476, in take
        return self.limit(num).collect()
      File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 438, in collect
        port = self._jdf.collectToPython()
      File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
      File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 69, in deco
        raise AnalysisException(s.split(': ', 1)[1], stackTrace)
    pyspark.sql.utils.AnalysisException: u"Reference '11' is ambiguous, could be: 11#13, 11#32.;"

Я знаю, что один из обходных путей - удалить строку заголовка и определить схему вручную, есть ли какая-либо другая тактика для использованиявывести схему для одного файла, а затем добавить другие файлы в DF?

Ответы [ 2 ]

0 голосов
/ 27 апреля 2018

Я придумал другой способ.но недостаточно динамичный для огромного количества файлов.Я предпочитаю способ, предложенный @Steven.

df1 = spark.read.csv("/sample/flight/flight_delays1.csv",header=True,inferSchema=True)
df2 = spark.read.schema(df1.schema).csv("/sample/flight/flight_delays2.csv")
df3 = spark.read.schema(df1.schema).csv("/sample/flight/flight_delays3.csv")

complete_df = df1.union(df2).union(df3)
complete_df.count()
complete_df.printSchema()
0 голосов
/ 26 апреля 2018

Я бы посоветовал вам сделать это:

#First, you infer the schema from the file you know
schm_file = spark.read.csv("/sample/flight/file_with_header.csv",header=True,inferSchema=True)
# Then you use the schema to read the other files 
flights = spark.read.csv("/sample/flight/",header=False, mode='DROPMALFORMED',schema = schm_file.schema)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...