Фильтр Dask по индексу DataFrame с использованием отрицательного диапазона - PullRequest
0 голосов
/ 11 января 2019

В моем случае я использую ~ 100 МБ ежедневно. Я использовал Pandas DataFrame в качестве отдельных файлов, но это не удается из-за склонности pandas к принудительному использованию dtypes в зависимости от данных разных дней. Я пытаюсь прочитать их с Dask DataFrame, и это не удается из-за различных схем. С описательными именами столбцов и 717 столбцами сообщение об исключении является неуправляемым (100 КБ плотных двоичных строк фиксированной длины).

Итак, я попытался использовать Dask, чтобы написать мега-паркет, и надеюсь, что он решит проблему с черепом панд dtype. Иногда мне нужно повторно обработать день или 2 в середине полного диапазона данных, которые у меня уже есть.

Мне удалось придумать это до сих пор, это очень уродливо, и я не могу не думать, что есть лучший способ. Кажется, нет никакого способа использовать фильтры с read_parquet, потому что мы фильтруем по индексу. Кажется, нет способа отрицать диапазон значений индекса. Индекс - это просто дата, количество часов и т. Д. Df - это ценность данных за день, а mdf - это мой мега-df с данными за годы

            mdf = dd.read_parquet(self.local_location + self.megafile, engine='pyarrow')
            inx = df.index.unique()
            start1 = '2016-01-01'
            end1 = pd.to_datetime(inx.values.min()).strftime('%Y-%m-%d')
            start2 = pd.to_datetime(inx.values.max()).strftime('%Y-%m-%d')
            end2 = '2029-01-01'
            mdf1 = mdf[start1:end1]
            mdf2 = mdf[start2:end2]
            if len(mdf1) > 0:
                df_usage1 = 1 + mdf1.memory_usage(deep=True).sum().compute() // 100000001

                if len(mdf2) > 0:
                    df_usage2 = 1 + mdf1.memory_usage(deep=True).sum().compute() // 100000001
                    mdf1 = mdf1.append(mdf2, npartitions=df_usage2)
            else:
                if len(mdf2) > 0:
                    df_usage2 = 1 + mdf2.memory_usage(deep=True).sum().compute() // 100000001
                    mdf1 = dd.from_pandas(df).append(mdf2, npartitions=df_usage2)

это также вызывает исключение на

mdf1 = mdf1.append(df, npartitions=df_usage1)

{ValueError}Exactly one of npartitions and chunksize must be specified.

Забавно, потому что именно это я и делаю.

в этом случае df_usage2 = 2

Требуется альтернативный лучший подход и, возможно, объяснение того, что на самом деле не так с дополнением.

1 Ответ

0 голосов
/ 20 февраля 2019

Я рекомендую не указывать ключевое слово npartitions=

...