У меня есть решение, которое я использую для загрузки большого количества несовместимых файлов.Он исходит из кода, который я использую для файлов паркета, поэтому в нем могут быть небольшие ошибки, но он дает вам представление.
Почему-то он не может быть оптимизирован, поскольку циклически перебирает все файлы csv и читает первыестрока для оценки списка столбцов.
Я использую следующие функции:
import csv
from pyspark.sql.functions import lit
def get_schemas(path_list):
"""
Identify schemas from csv files contained in the path list and regroup path with the same schema
Input : - path_list : list of paths containing csv files
Output : dictionnary containing one entry by distinct schema, with a list of the path corresponding to each schema
"""
schemas = {}
i = 0
for path in path_list:
# Get first row (header to identify the schema
try:
with open(path, "rb") as f:
reader = csv.reader(f, delimiter=',', escapechar='#')
schema = reader.next()
not_assigned = True
# If the schema already exists in the dictionnary, append the path to the path list corresponding to this schema
for k,v in schemas.iteritems():
if schema == v['schema']:
schemas[k]['paths'].append(path)
not_assigned = False
break
# If the schema does not exist in the dictionnary, create a new entry
if not_assigned:
i = i+1
schemas[str(i)] = {}
schemas[str(i)]['schema'] = schema
schemas[str(i)]['paths'] = []
schemas[str(i)]['paths'].append(path)
except:
logging.info('Impossible to get schema for %s', path)
return schemas
# Function from https://stackoverflow.com/questions/39758045/how-to-perform-union-on-two-dataframes-with-different-amounts-of-columns-in-spar/42159279#42159279
def harmonize_schemas_and_combine(df_left, df_right):
left_types = {f.name: f.dataType for f in df_left.schema}
right_types = {f.name: f.dataType for f in df_right.schema}
left_fields = set((f.name, f.dataType, f.nullable) for f in df_left.schema)
right_fields = set((f.name, f.dataType, f.nullable) for f in df_right.schema)
# First go over left-unique fields
for l_name, l_type, l_nullable in left_fields.difference(right_fields):
if l_name in right_types:
r_type = left_types[l_name]
if l_type != r_type:
raise TypeError, "Union failed. Type conflict on field %s. left type %s, right type %s" % (l_name, l_type, r_type)
else:
raise TypeError, "Union failed. Nullability conflict on field %s. left nullable %s, right nullable %s" % (l_name, l_nullable, not(l_nullable))
df_right = df_right.withColumn(l_name, lit(None).cast(l_type))
# Now go over right-unique fields
for r_name, r_type, r_nullable in right_fields.difference(left_fields):
if r_name in left_types:
l_type = right_types[r_name]
if r_type != l_type:
raise TypeError, "Union failed. Type conflict on field %s. right type %s, left type %s" % (r_name, r_type, l_type)
else:
raise TypeError, "Union failed. Nullability conflict on field %s. right nullable %s, left nullable %s" % (r_name, r_nullable, not(r_nullable))
df_left = df_left.withColumn(r_name, lit(None).cast(r_type))
# Make sure columns are in the same order
df_left = df_left.select(df_right.columns)
return df_left.union(df_right)
def create_df_path_list(path_list):
"""
Create a dataframe by concatenating all databases from the path list
Input : - path_list : list of path corresponding to databases
Output : concatenated dataframe from all databases from the path list
"""
# Get all different schemas from path_list
logging.info('Infering schemas from the path list')
schemas = get_schemas(path_list)
logging.info('The path list contains %s different schemas', len(schemas))
df = sqlContext.createDataFrame(sc.emptyRDD(), pysqlt.StructType([]))
dict_df = {}
# Load each set of path with the same schema into separate dataframes
for k,v in schemas.iteritems():
logging.info('Reading %s paths with schema %s', len(v['paths']), k)
dict_df[k] = spark.read.format("csv").option("header","true").option("delimiter", ",").option("escape", "#").load(*v['paths'])
#Merge all the dataframes
for k,v in dict_df.iteritems():
df = harmonize_schemas_and_combine(df, v)
return df
После этого вам просто нужно вызвать функцию:
import os
csv_paths = [os.path.join("/mnt/Product/", file) for file in os.listdir("/mnt/Product/") if ".csv" in file]
df = create_df_path_list(csv_paths)
Возможно, небудь идеальным, но я надеюсь, что это поможет!