Ошибка разницы схемы DASK с недостаточной детализацией - PullRequest
0 голосов
/ 09 января 2019

Я имею в виду этот вопрос - dask dataframe читать разность схемы паркета

Но метаданные, возвращаемые Dask, не указывают на различия между разными кадрами данных. Вот мой код, который анализирует детали исключения, чтобы найти несовпадающие dtypes. Это не находит ничего. Существует до 100 фреймов данных с 717 столбцами (каждый размером ~ 100 МБ).

    try:
        df = dd.read_parquet(data_filenames, columns=list(cols_to_retrieve), engine='pyarrow')
    except Exception as ex:
        # Process the ex message to find the diff, this will break if dask change their error message
        msgs = str(ex).split('\nvs\n')
        cols1 = msgs[0].split('metadata')[0]
        cols1 = cols1.split('was different. \n')[1]
        cols2 = msgs[1].split('metadata')[0]
        df1_err = pd.DataFrame([sub.split(":") for sub in cols1.splitlines()])
        df1_err = df1_err.dropna()
        df2_err = pd.DataFrame([sub.split(":") for sub in cols2.splitlines()])
        df2_err = df2_err.dropna()
        df_err = pd.concat([df1_err, df2_err]).drop_duplicates(keep=False)
        raise Exception('Mismatch dataframes - ' + str(df_err))

Исключение, которое я получаю:

'Mismatch dataframes - Empty DataFrame Columns: [0, 1] Index: []'

Эта ошибка не возникает в fastparquet, но она настолько медленная, что ее невозможно использовать.

Я добавил это к созданию фреймов данных (используя pandas to_parquet для их сохранения), пытаясь объединить dtypes по столбцу

    df_float = df.select_dtypes(include=['float16', 'float64'])
    df = df.drop(df_float.columns, axis=1)

    for col in df_float.columns:
        df_float[col] = df_float.loc[:,col].astype('float32')

    df = pd.concat([df, df_float], axis=1)

    df_int = df.select_dtypes(include=['int8', 'int16', 'int32'])

    try:
        for col in df_int.columns:
            df_int[col] = df_int.loc[:, col].astype('int64')
        df = df.drop(df_int.columns, axis=1)
        df = pd.concat([df, df_int], axis=1)
    except ValueError as ve:
        print('Error with upcasting - ' + str(ve))

Кажется, это работает в соответствии с моим исключением. Но я не могу выяснить, как отличаются фреймы данных, так как исключение, выданное dask read_parquet, не говорит мне? Идеи о том, как определить, что он находит, как разные?

1 Ответ

0 голосов
/ 10 января 2019

Вы можете использовать функцию fastparquet merge , чтобы создать файл метаданных из множества файлов данных (для сканирования всех файлов потребуется некоторое время). После этого pyarrow будет использовать эти файлы метаданных, и этого может быть достаточно, чтобы избавиться от проблемы для вас.

...