Spark Кирпичи данных Объединение файлов с небольшим несоответствием в столбцах - PullRequest
0 голосов
/ 21 февраля 2019

Я пытаюсь использовать следующий тип кода для загрузки нескольких файлов в мой Dataframe.

myProductDF = spark.read.format("csv").option("header","true").option("delimiter", ",").option("escape", "#").load("/mnt/Product/*")

Теперь так получилось, что у некоторых файлов есть дополнительный столбец или два.Ex.ProductTable имеет ProductTableX.csv с 10 столбцами и ProductTableY.csv с 11 столбцами.Когда я пытаюсь загрузить все файлы в папке «Продукт» в мой фрейм данных, используя подстановочный знак * вместо указания имен отдельных файлов, он пропускает дополнительный столбец в файлах, которые имеют это.Поскольку есть тысячи файлов для загрузки, невозможно выбрать конкретные варианты.Дайте мне знать, если есть хорошее решение для этого сценария.Будет хорошо, если для файлов, в которых дополнительный столбец недоступен, по умолчанию будет установлено значение NULL, при условии, что он загружает дополнительный столбец в кадр данных со значениями для файлов, в которых он присутствует.

1 Ответ

0 голосов
/ 21 февраля 2019

У меня есть решение, которое я использую для загрузки большого количества несовместимых файлов.Он исходит из кода, который я использую для файлов паркета, поэтому в нем могут быть небольшие ошибки, но он дает вам представление.

Почему-то он не может быть оптимизирован, поскольку циклически перебирает все файлы csv и читает первыестрока для оценки списка столбцов.

Я использую следующие функции:

import csv
from pyspark.sql.functions import lit

def get_schemas(path_list):
    """
    Identify schemas from csv files contained in the path list and regroup path with the same schema
    Input : - path_list : list of paths containing csv files
    Output : dictionnary containing one entry by distinct schema, with a list of the path corresponding to each schema
    """
    schemas = {}
    i = 0

    for path in path_list:
        # Get first row (header to identify the schema
        try:
            with open(path, "rb") as f:
                reader = csv.reader(f, delimiter=',', escapechar='#')
                schema = reader.next()

            not_assigned = True

            # If the schema already exists in the dictionnary, append the path to the path list corresponding to this schema
            for k,v in schemas.iteritems():
                if schema == v['schema']:
                    schemas[k]['paths'].append(path)
                    not_assigned = False
                    break

            # If the schema does not exist in the dictionnary, create a new entry
            if not_assigned:
                i = i+1
                schemas[str(i)] = {}
                schemas[str(i)]['schema'] = schema
                schemas[str(i)]['paths'] = []
                schemas[str(i)]['paths'].append(path)
        except:
            logging.info('Impossible to get schema for %s', path)

    return schemas

# Function from https://stackoverflow.com/questions/39758045/how-to-perform-union-on-two-dataframes-with-different-amounts-of-columns-in-spar/42159279#42159279    
def harmonize_schemas_and_combine(df_left, df_right):
    left_types = {f.name: f.dataType for f in df_left.schema}
    right_types = {f.name: f.dataType for f in df_right.schema}
    left_fields = set((f.name, f.dataType, f.nullable) for f in df_left.schema)
    right_fields = set((f.name, f.dataType, f.nullable) for f in df_right.schema)

    # First go over left-unique fields
    for l_name, l_type, l_nullable in left_fields.difference(right_fields):
        if l_name in right_types:
            r_type = left_types[l_name]
            if l_type != r_type:
                raise TypeError, "Union failed. Type conflict on field %s. left type %s, right type %s" % (l_name, l_type, r_type)
            else:
                raise TypeError, "Union failed. Nullability conflict on field %s. left nullable %s, right nullable %s"  % (l_name, l_nullable, not(l_nullable))
        df_right = df_right.withColumn(l_name, lit(None).cast(l_type))

    # Now go over right-unique fields
    for r_name, r_type, r_nullable in right_fields.difference(left_fields):
        if r_name in left_types:
            l_type = right_types[r_name]
            if r_type != l_type:
                raise TypeError, "Union failed. Type conflict on field %s. right type %s, left type %s" % (r_name, r_type, l_type)
            else:
                raise TypeError, "Union failed. Nullability conflict on field %s. right nullable %s, left nullable %s" % (r_name, r_nullable, not(r_nullable))
        df_left = df_left.withColumn(r_name, lit(None).cast(r_type))    

    # Make sure columns are in the same order
    df_left = df_left.select(df_right.columns)

    return df_left.union(df_right)

def create_df_path_list(path_list):
    """
    Create a dataframe by concatenating all databases from the path list
    Input : - path_list : list of path corresponding to databases
    Output : concatenated dataframe from all databases from the path list
    """
    # Get all different schemas from path_list
    logging.info('Infering schemas from the path list')
    schemas = get_schemas(path_list)

    logging.info('The path list contains %s different schemas', len(schemas))

    df = sqlContext.createDataFrame(sc.emptyRDD(), pysqlt.StructType([]))
    dict_df = {}
    # Load each set of path with the same schema into separate dataframes
    for k,v in schemas.iteritems():
        logging.info('Reading %s paths with schema %s', len(v['paths']), k)
        dict_df[k] = spark.read.format("csv").option("header","true").option("delimiter", ",").option("escape", "#").load(*v['paths'])

    #Merge all the dataframes
    for k,v in dict_df.iteritems():
        df = harmonize_schemas_and_combine(df, v)

    return df

После этого вам просто нужно вызвать функцию:

import os

csv_paths = [os.path.join("/mnt/Product/", file) for file in os.listdir("/mnt/Product/") if ".csv" in file]

df = create_df_path_list(csv_paths)

Возможно, небудь идеальным, но я надеюсь, что это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...