Параллельное вычисление с dask, когда необходимо вычислить столбец dataframe - PullRequest
0 голосов
/ 12 сентября 2018

У меня есть 360 миллионов записей данных о наблюдениях за птицами enter image description here, и я хотел бы вычислить центр тяжести каждого вида птиц как функцию дня года, используя dask в распределенномпуть.

Я хотел бы сделать:

df2 = df.groupby(['VERNACULARNAME', 'yearday']).mean()

, но мне нужно сначала вычислить yearday, и я не могу понять, есть ли способ сделать это на лету сdask.Я надеялся, что dask может просто сохранить новые данные для dask рабочих, но когда я пытаюсь:

def yearday(r):
    r['yearday'] = dt.datetime(r['YEAR'], r['MONTH'], r['DAY']).timetuple().tm_yday
    return r

df.apply(yearday, axis=1).persist()

, они не масштабируются.

Если кто-то хочет на самом деле попробовать, данные могут быть загружены следующим образом:

import dask.dataframe as dd
df = dd.read_parquet('s3://esipfed/ebird/EOD_CLO_2016.parq.gz',
            storage_options={'anon': True, 'use_ssl': False})

Примечание. Хотя я назвал этот набор данных EOD_CLO_2016.parq.gz, он разбит на множество объектов в S3ведро для облегчения распараллеливания.Каждый кусок разархивирован.

Есть ли какой-нибудь способ сделать это вычисление на лету распределенным способом или мне нужно написать другой файл данных со столбцом года, прежде чем я использую groupby для выполнения масштабируемой части?

1 Ответ

0 голосов
/ 13 сентября 2018

После того, что вы сделали на своем ноутбуке Я бы изменил шаги до groupby следующим образом

df = dd.read_parquet('s3://esipfed/ebird/EOD_CLO_2016.parq.gz',
                     columns=['YEAR', 'MONTH', 'DAY', 'DECIMALLATITUDE',
                              'DECIMALLONGITUDE', 'VERNACULARNAME'],
                     storage_options={'anon': True, 'use_ssl': False})

df = df.map_partitions(lambda df: df.assign(yearday=pd.to_datetime(df[['YEAR', 'MONTH', 'DAY']]).dt.dayofyear,
                                            lat=np.deg2rad(df['DECIMALLATITUDE'].values),
                                            lon=np.deg2rad(df['DECIMALLONGITUDE'].values)),

                        meta={'YEAR':'i8', 'MONTH':'i8', 'DAY':'i8',
                              'DECIMALLATITUDE':'f8','DECIMALLONGITUDE':'f8',
                              'VERNACULARNAME':'object',
                              'yearday':'i8', 'lat':'f8', 'lon':'f8'})

df = df.map_partitions(lambda df :df.assign(x=np.cos(df['lat'].values) * np.cos(df['lon'].values),
                                            y=np.cos(df['lat'].values) * np.sin(df['lon'].values),
                                            z=np.sin(df['lat'].values)),
                       meta={'YEAR':'i8', 'MONTH':'i8', 'DAY':'i8',
                              'DECIMALLATITUDE':'f8','DECIMALLONGITUDE':'f8',
                              'VERNACULARNAME':'object',
                              'yearday':'i8', 'lat':'f8', 'lon':'f8',
                              'x':'f8', 'y':'f8', 'z':'f8'})

ОБНОВЛЕНИЕ: I 'Я не уверен, если это хорошая идея, чтобы ваши данные были сохранены в виде одного и заархивированного файла вместо нескольких файлов.Рассматривали ли вы различные варианты?

ОБНОВЛЕНИЕ 2: Учитывая, что преобразование из градусов в радианы является линейным, вы можете вычислить lon, lat, а затем x,y,z после groupby.

...