Можно ли получить пересечение множеств с помощью dask? - PullRequest
2 голосов
/ 01 мая 2019

У меня есть большой набор данных (50 миллионов строк), в котором мне нужно выполнить некоторые построчные вычисления, например, получить пересечение двух наборов (каждый в отдельном столбце)

например

col_1:{1587004, 1587005, 1587006, 1587007}
col_2:{1587004, 1587005}
col_1.intersection(col_2) = {1587004, 1587005}

Это прекрасно работает для моего фиктивного набора данных (100 000) строк.Однако, когда я пытаюсь сделать то же самое с реальным, память заканчивается

Моя кодировка работает с использованием pandas 1: 1, портирование ее на dask не работает NotImplementedError: getitem для серии поддерживается только для других объектов сериис соответствующей структурой разделов

игра с map_partitions до сих пор не работала

рабочий код:

df["intersection"] = [col_1.intersection(col_2) for col_1,col2 in zip(df.col_1,df.col_2)]

замена pandas df на dask df выполняется в нереализованная ошибка:

ddf["intersection"] = [col_1.intersection(col_2) for col_1,col2 in zip(df.col_1,df.col_2)]

с использованием map_partions "работает", но я не понимаю, как назначить результат для моего существующего ddf

def intersect_sets(df, col_1, col_2):
    result = df[col_1].intersection(df[col_2])
    return result

newCol = ddf.map_partitions(lambda df : df.apply(lambda series: intersect_sets(series,"col_1","col_2"),axis=1),meta=str).compute()

, просто делая:

ddf['result'] = newCol

Приводит к: ValueError: Не все разделы известны, не могут выровнять разделы.Пожалуйста, используйте set_index для установки индекса.

update: сброс индекса удаляет ошибку, однако тогда столбец, содержащий пересечения, больше не соответствует другим двум столбцам.Похоже, порядок был испорчен ...

ddf2 = ddf.reset_index().set_index('index')
ddf2 ['result'] = result

Я бы ожидал dask-фрейм данных со следующими столбцами

col_1:{1587004, 1587005, 1587006, 1587007}
col_2:{1587004, 1587005}
col_3:{1587004, 1587005}

Не только отлично работающее решение ценится, но и некоторыепонимание того, как работает map_partitions, мне уже очень помогло бы:)

update: Благодаря M.Rocklin я понял это.На будущее я или другие спотыкаемся об этом вопросе:

ddf = ddf.assign(
       new_col = ddf.map_partitions(
           lambda df : df.apply(
                        lambda series:intersect_sets(
                                series,"col_1","col_2"),axis=1),meta=str)
 )
 df = ddf.compute()

1 Ответ

1 голос
/ 18 мая 2019

Если у вас есть функция, которая работает с фреймами данных pandas:

def f(df: pandas.DataFrame) -> pandas.Series:
    return df.apply(...)

Тогда вы можете отобразить эту функцию на свои разделы

df['new'] = df.map_partitions(f)

Я думаю, что ваша проблема в том, что вы 'мы напрасно вызвали вычисление здесь, и поэтому вы пытаетесь поместить кадр данных pandas в кадр данных dask.

# Don't do this
new = df.map_partitions(f).compute() 
df['new'] = new  # tries to put a pandas dataframe into a dask dataframe
...