У меня есть большой набор данных (50 миллионов строк), в котором мне нужно выполнить некоторые построчные вычисления, например, получить пересечение двух наборов (каждый в отдельном столбце)
например
col_1:{1587004, 1587005, 1587006, 1587007}
col_2:{1587004, 1587005}
col_1.intersection(col_2) = {1587004, 1587005}
Это прекрасно работает для моего фиктивного набора данных (100 000) строк.Однако, когда я пытаюсь сделать то же самое с реальным, память заканчивается
Моя кодировка работает с использованием pandas 1: 1, портирование ее на dask не работает NotImplementedError: getitem для серии поддерживается только для других объектов сериис соответствующей структурой разделов
игра с map_partitions до сих пор не работала
рабочий код:
df["intersection"] = [col_1.intersection(col_2) for col_1,col2 in zip(df.col_1,df.col_2)]
замена pandas df на dask df выполняется в нереализованная ошибка:
ddf["intersection"] = [col_1.intersection(col_2) for col_1,col2 in zip(df.col_1,df.col_2)]
с использованием map_partions "работает", но я не понимаю, как назначить результат для моего существующего ddf
def intersect_sets(df, col_1, col_2):
result = df[col_1].intersection(df[col_2])
return result
newCol = ddf.map_partitions(lambda df : df.apply(lambda series: intersect_sets(series,"col_1","col_2"),axis=1),meta=str).compute()
, просто делая:
ddf['result'] = newCol
Приводит к: ValueError: Не все разделы известны, не могут выровнять разделы.Пожалуйста, используйте set_index
для установки индекса.
update: сброс индекса удаляет ошибку, однако тогда столбец, содержащий пересечения, больше не соответствует другим двум столбцам.Похоже, порядок был испорчен ...
ddf2 = ddf.reset_index().set_index('index')
ddf2 ['result'] = result
Я бы ожидал dask-фрейм данных со следующими столбцами
col_1:{1587004, 1587005, 1587006, 1587007}
col_2:{1587004, 1587005}
col_3:{1587004, 1587005}
Не только отлично работающее решение ценится, но и некоторыепонимание того, как работает map_partitions, мне уже очень помогло бы:)
update: Благодаря M.Rocklin я понял это.На будущее я или другие спотыкаемся об этом вопросе:
ddf = ddf.assign(
new_col = ddf.map_partitions(
lambda df : df.apply(
lambda series:intersect_sets(
series,"col_1","col_2"),axis=1),meta=str)
)
df = ddf.compute()