Dask Объединение наборов данных с перекрывающимся диапазоном дат (дедупликация) - PullRequest
0 голосов
/ 25 февраля 2020

Я столкнулся с большим количеством файлов с данными в формате date, id и несколькими другими столбцами. Объем данных превышает 10 ГБ в памяти. Однако в файлах есть повторяющиеся записи (одинаковые (дата, идентификатор) пары). После устранения дубликатов набор данных составляет всего около 1 ГБ. Я хотел использовать Dask для выполнения процесса дедупликации. Однако моя текущая реализация запредельно медленная. Я думаю, было бы хорошо разделить данные по дате. Однако я не смог отсортировать данные один раз в сумерках. Установка даты в качестве индекса занимает очень много времени.

Я написал пример с тестовыми данными. Я был бы очень признателен, если бы кто-то мог взглянуть, если есть лучший способ решить эту проблему. Фактические данные, с которыми я работаю, имеют 150 тыс. Строк на набор данных (~ 50 МБ). В следующем примере время вычислений очень быстро увеличивается, поэтому этот метод не может быть применен к моим реальным данным.

def get_test_df(N=20):
    import random
    import pandas as pd

    uids = [f'id_{ix}' for ix in range(int(N/5))]
    udates = pd.date_range(start='20000101', periods=int(N/10))
    data = {
        'date': random.choices(udates, k=N),
        'id': random.choices(uids, k=N),
        'vals1': random.choices(list(range(5))+5*[pd.NA], k=N),
        'vals2': random.choices(list(range(5))+5*[pd.NA], k=N),
    }
    dft = pd.DataFrame(data).sort_values(['date', 'id'])
    return dft

from functools import reduce
def apply_merge_rows_with_missing_info(df):
    '''Apply function to use when deduplicatig point data
    '''
    # return reduce(lambda x, y: x.combine_first(y), df.values)
    items = [x for _, x in df.iterrows()]
    # Use combine_first to fill potential gaps in duplicates
    aggrow = reduce(lambda x, y: x.combine_first(y), items)
    return aggrow


# script
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
from dask.distributed import Client
cluster = LocalCluster(n_workers=6, dashboard_address=':34610', processes=True)
client = Client(cluster)
client

dfs = [get_test_df(N=100) for _ in range(15)]
df = dd.concat(dfs)
out = df.groupby(['date', 'id']).apply(apply_merge_rows_with_missing_info)
# This step takes very long when the data grows
res = out.compute()
res
...