Я столкнулся с большим количеством файлов с данными в формате date
, id
и несколькими другими столбцами. Объем данных превышает 10 ГБ в памяти. Однако в файлах есть повторяющиеся записи (одинаковые (дата, идентификатор) пары). После устранения дубликатов набор данных составляет всего около 1 ГБ. Я хотел использовать Dask для выполнения процесса дедупликации. Однако моя текущая реализация запредельно медленная. Я думаю, было бы хорошо разделить данные по дате. Однако я не смог отсортировать данные один раз в сумерках. Установка даты в качестве индекса занимает очень много времени.
Я написал пример с тестовыми данными. Я был бы очень признателен, если бы кто-то мог взглянуть, если есть лучший способ решить эту проблему. Фактические данные, с которыми я работаю, имеют 150 тыс. Строк на набор данных (~ 50 МБ). В следующем примере время вычислений очень быстро увеличивается, поэтому этот метод не может быть применен к моим реальным данным.
def get_test_df(N=20):
import random
import pandas as pd
uids = [f'id_{ix}' for ix in range(int(N/5))]
udates = pd.date_range(start='20000101', periods=int(N/10))
data = {
'date': random.choices(udates, k=N),
'id': random.choices(uids, k=N),
'vals1': random.choices(list(range(5))+5*[pd.NA], k=N),
'vals2': random.choices(list(range(5))+5*[pd.NA], k=N),
}
dft = pd.DataFrame(data).sort_values(['date', 'id'])
return dft
from functools import reduce
def apply_merge_rows_with_missing_info(df):
'''Apply function to use when deduplicatig point data
'''
# return reduce(lambda x, y: x.combine_first(y), df.values)
items = [x for _, x in df.iterrows()]
# Use combine_first to fill potential gaps in duplicates
aggrow = reduce(lambda x, y: x.combine_first(y), items)
return aggrow
# script
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
from dask.distributed import Client
cluster = LocalCluster(n_workers=6, dashboard_address=':34610', processes=True)
client = Client(cluster)
client
dfs = [get_test_df(N=100) for _ in range(15)]
df = dd.concat(dfs)
out = df.groupby(['date', 'id']).apply(apply_merge_rows_with_missing_info)
# This step takes very long when the data grows
res = out.compute()
res