Эффективное объединение 2,1 миллиарда записей в наборах данных - PullRequest
0 голосов
/ 20 февраля 2020

У меня есть несколько наборов данных, состоящих из 2 столбцов: ID и ActivityDate. В наборе данных ID+ActivityDate является уникальным. Каждый набор данных имеет длину около 35 миллионов записей. Существует более 60 наборов данных.

Мой желаемый результат в основном ID, FirstActivityDate и LastActivityDate. Это, в основном, часть сокращения задания карты / сокращения.

Моя первая попытка была в основном прочитать первый набор данных, установить базовую линию, а затем, когда я читал следующий набор данных, я выполняю foreach, сравнивая и обновляя LastActivityDate. Хотя используемая память была очень приемлемой (с 2 ГБ, но постоянно ниже 1,25 ГБ), это заняло слишком много времени. Я сделал расчет, результирующий набор должен иметь длину около 1,5 ГБ, поэтому он может управляться с локальной памятью.

for x in files:
    parsedData = parseFile(x)
    dt = parsedData[0]
    cards = parsedData[1]
    for card in cards:
        #num = int(card[:16])
        if card in result:
            result[card].lastRecharged = dt
        else:
            result[card]=CreditCard(dt)

Комментируя эту строку #num = int(card[:16]), скорость выполнения l oop упала до 30 секунд на файл (оригинал был около 150 секунд), но теперь память вышла из-под контроля. Синтаксический анализ файла - это, в основном, чтение файла, которое занимает менее 1 секунды.

Моя вторая попытка была с использованием pandas, но я не смог объединить наборы данных так, как я хочу. Я должен сказать, что я не опытный в pandas.

Есть ли третий вариант?

Ответы [ 2 ]

0 голосов
/ 26 февраля 2020

IIU C вас интересует первое и последнее ActivityDate для каждого ID. В этом случае вы можете использовать dask . Предположим, что все ваши файлы являются CSV, и они хранятся в папке с именем data.

import dask.dataframe as dd
import pandas as pd

df = dd.read_csv("data/*.csv")

# convert ActivityDate to datetime
df["ActivityDate"] = df["ActivityDate"].astype("M8[us]")

# use aggregate
out = df.groupby("ID")\
        .agg({"ActivityDate":["min", "max"]})\
        .compute()

out.columns = ["_".join(col) for col in out.columns]
out = out.reset_index()
out = out.rename(columns={'ActivityDate_min':"FirstActivityDate",
                          'ActivityDate_max':"LastActivityDate"})
0 голосов
/ 26 февраля 2020

Я закончил тем, что приблизился к своей цели.

Кулак Я сделал чтение и анализ в память параллельным, и в пакетах, используя multithreading.pool, каждый результат помещался в очередь. Каждый пул будет иметь 3 последовательных файла с циклом в 4 pools. В 5-м пуле я предварительно объединяю словари, отбрасывая неповторяющиеся ключи (карточки). Затем в 6-м пуле я делаю последнее слияние.

Весь процесс занимает около 75 секунд на моей локальной машине. Все это в конечном итоге потребляет более 4 ГБ оперативной памяти, что не является идеальным, но управляемым.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...