Dask - dataframe.read_csv не распознает правильные типы данных - PullRequest
0 голосов
/ 29 апреля 2019

Следующий код предназначен для чтения простого файла .csv с четырьмя столбцами со строковыми значениями и строкой заголовка.Затем к фрейму добавляется еще один столбец, который занимает каждую строку в столбце «опубликовано» (строка даты) и предоставляет соответствующий день недели для каждой строки.Однако код выдает ошибку «не реализовано» и, похоже, не распознает типы данных, даже если они определены в параметрах функции (см. Сообщение об ошибке ниже).

Я пытался использовать Dataframe.read_csv, указав и не указав типы данных столбца, но получил ту же ошибку.Строка, в которой происходит ошибка, является списком, но у меня та же ошибка с циклом.Кадры данных кажутся правильными, когда я их распечатываю, но все типы данных являются «объектными», что неверно.

«NotImplemented», кажется, означает, что Dataframe изменяется, но поскольку все операции выполняются в отдельном кадрегде Dask видит объект не из серии?

from dask import delayed, compute, visualize, dataframe

...

def treat(frame):
    frame["day"] = [pd.Timestamp(value) for value in frame.posted]
    print(frame.columns)
    return frame

def find_files():
...

def construct_frames(files):
    dataframes = []
    # choose 3 of all the files
    selection = [files[random.randrange(len(files) - 1)] for i in range(1,4)]
    for pair in selection:
        key = pair[0]
        file = pair[1]
        path = os.path.join(TOP_DIR + "/engagement_id=" + key + "/" + file)
        data = dataframe.read_csv(path,
                                  dtype={"id":str,"data_import_id": str, "posted": str, "amount": str})
        print(data.columns, data.head())
        treat(data)
        dataframes.append(data)
    return dataframes

files = find_files()
dataframes = construct_frames(files)
visualize(dataframes)

Вывод (в Jupyter):

Dask DataFrame Structure:
                   id data_import_id  posted  amount
npartitions=1                                       
               object         object  object  object
                  ...            ...     ...     ...
Dask Name: from-delayed, 3 tasks
---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
<ipython-input-8-e30d04e9aed0> in <module>
     47 
     48 files = find_files()
---> 49 dataframes = construct_frames(files)
     50 
     51 

<ipython-input-8-e30d04e9aed0> in construct_frames(files)
     42                                   dtype={"id":str,"data_import_id": str, "posted": str, "amount": str})
     43         print(data)
---> 44         treat(data)
     45         dataframes.append(data)
     46     return dataframes

<ipython-input-8-e30d04e9aed0> in treat(frame)
     15 
     16 def treat(frame):
---> 17     frame["day"] = [pd.Timestamp(value) for value in frame.posted]
     18     print(frame.columns)
     19     return frame

<ipython-input-8-e30d04e9aed0> in <listcomp>(.0)
     15 
     16 def treat(frame):
---> 17     frame["day"] = [pd.Timestamp(value) for value in frame.posted]
     18     print(frame.columns)
     19     return frame

/anaconda3/envs/dask-tutorial/lib/python3.6/site-packages/dask/dataframe/core.py in __getitem__(self, key)
   2059             return Series(graph, name, self._meta, self.divisions)
   2060         raise NotImplementedError(
-> 2061             "Series getitem in only supported for other series objects "
   2062             "with matching partition structure"
   2063         )

NotImplementedError: Series getitem in only supported for other series objects with matching partition structure

Данные выглядят примерно так: буквенно-цифровые строки и строка даты, которая получаетконвертируется в «день» в новом столбце:

id  data_import_id  posted  amount
00000000  3c221ff  2014-01-02T19:00:00.000-05:00  3656506
00000013  3c221ff  2014-01-03T19:00:00.000-05:00  3656506
00000015  3c221ff  2014-01-04T19:00:00.000-05:00  3656506
0000000a  3c221ff  2014-01-05T19:00:00.000-05:00  3656506
00000001  3c221ff  2014-01-06T19:00:00.000-05:00  3656506

1 Ответ

1 голос
/ 30 апреля 2019

Я получил ошибку в этой строке

frame["day"] = [pd.Timestamp(value) for value in frame.posted]

Оказывается, есть несколько возможностей добавить столбец к dask DataFrame

  • эти подходы предполагают, что информация о часовом поясе не важна
  • если важен часовой пояс, см. Комментарий @ MikeB2019x здесь о том, как принять это во внимание

Использование map_partitions (за этот пост SO )

ddf = dataframe.read_csv('test.csv',
                        delimiter="  ",
                        engine='python',
                        dtype={"id":str,"data_import_id": str, "amount": str})
meta = ('posted', 'datetime64[ns]')
ddf['posted'] = ddf.posted.map_partitions(pd.to_datetime, meta=meta)
ddf = treat(ddf)

print(ddf.head())

         id data_import_id                    posted   amount  day_of_week   weekday
0  00000000        3c221ff 2014-01-02 19:00:00-05:00  3656506            2  Thursday
1  00000013        3c221ff 2014-01-03 19:00:00-05:00  3656506            3    Friday
2  00000015        3c221ff 2014-01-04 19:00:00-05:00  3656506            4  Saturday
3  0000000a        3c221ff 2014-01-05 19:00:00-05:00  3656506            5    Sunday
4  00000001        3c221ff 2014-01-06 19:00:00-05:00  3656506            6    Monday

print(ddf.dtypes)
id                        object
data_import_id            object
posted            datetime64[ns]
amount                    object
day_of_week                int64
weekday                   object
dtype: object

Использование .to_datetime (за этот пост SO )

ddf = dataframe.read_csv('test.csv',
                        delimiter="  ",
                        engine='python',
                        dtype={"id":str,"data_import_id": str, "amount": str})
ddf['posted']=dataframe.to_datetime(ddf.posted, format="%Y%m%d %H:%M:%S") # option 1
# ddf['posted']=dataframe.to_datetime(ddf.posted, unit='ns') # option 2
ddf = treat(ddf)

print(ddf.head())
         id data_import_id                    posted   amount  day_of_week   weekday
0  00000000        3c221ff 2014-01-02 19:00:00-05:00  3656506            2  Thursday
1  00000013        3c221ff 2014-01-03 19:00:00-05:00  3656506            3    Friday
2  00000015        3c221ff 2014-01-04 19:00:00-05:00  3656506            4  Saturday
3  0000000a        3c221ff 2014-01-05 19:00:00-05:00  3656506            5    Sunday
4  00000001        3c221ff 2014-01-06 19:00:00-05:00  3656506            6    Monday

print(ddf.dtypes)
id                        object
data_import_id            object
posted            datetime64[ns]
amount                    object
day_of_week                int64
weekday                   object
dtype: object

Или просто укажите аргумент parse_dates для .read_csv

ddf = dataframe.read_csv('test.csv',
                        delimiter="  ",
                        engine='python',
                        parse_dates=['posted'],
                        dtype={"id":str,"data_import_id": str, "amount": str})
ddf = treat(ddf)

print(ddf.head())
         id data_import_id                    posted   amount  day_of_week   weekday
0  00000000        3c221ff 2014-01-02 19:00:00-05:00  3656506            2  Thursday
1  00000013        3c221ff 2014-01-03 19:00:00-05:00  3656506            3    Friday
2  00000015        3c221ff 2014-01-04 19:00:00-05:00  3656506            4  Saturday
3  0000000a        3c221ff 2014-01-05 19:00:00-05:00  3656506            5    Sunday
4  00000001        3c221ff 2014-01-06 19:00:00-05:00  3656506            6    Monday

print(ddf.dtypes)
id                                                object
data_import_id                                    object
posted            datetime64[ns, pytz.FixedOffset(-300)]
amount                                            object
day_of_week                                        int64
weekday                                           object
dtype: object

BTW, datetime атрибуты (.dt datetime namespace) могут использоваться в сериях dask аналогично пандам - ​​см. здесь

def treat(frame):
    frame['day_of_week'] = frame['posted'].dt.day
    frame['weekday'] = frame['posted'].dt.weekday_name
    return frame
...