В настоящее время у меня есть простая обработка данных, которая включает в себя группирование, объединение и параллельные операции столбца к столбцу. Не очень простая часть - это массивный используемый ряд (его подробные данные о стоимости / финансовых показателях). его размер составляет 300-400 ГБ.
из-за ограниченного объема ОЗУ, в настоящее время я использую неосновные вычисления с dask. Тем не менее, это действительно медленно.
Ранее я читал, используя CuDF для повышения производительности на map_partitions и groupby, однако большинство примеров используют GPU среднего класса (по крайней мере, 1050ti, большинство работает на облаке на основе gv). vm) и данные могут уместиться на ОЗУ GPU.
На моей машине установлены c E5-2620v3 (6C / 12T), 128 ГБ и K620 (только выделенный vram 2 ГБ).
Промежуточный используемый кадр данных хранится в паркете.
Будет ли это быстрее, если я использую низкоуровневый графический процессор с использованием CuDF? и возможно ли это сделать из базовых вычислений в графическом процессоре? (я смотрю вокруг, например, но еще не нашел)
Ниже приведен упрощенный псевдокод того, что я пытаюсь сделать
a.csv - это данные размером ~ 300 ГБ, состоящие из 3 столбцов (Hier1, Hier2, Hier3, значение). Hier1-3 - иерархия в виде строки. value - значение продаж. b.csv - данные размером ~ 50 ГБ, состоящие из 3 столбцов (Hier1, Hier2, valuetype, cost). Hier1-2 - это иерархия в виде строки. Тип значения - тип стоимости, в строке. себестоимость - это стоимостное значение
По сути, мне нужно сделать пропорциональную сверху вниз, основываясь на стоимости продаж из a.csv для каждой стоимости в b.csv. В конце концов, у меня есть каждая доступная стоимость на уровне Hier3 (более детальный уровень)
Первый шаг - создать пропорциональное соотношение:
import dask.dataframe as dd
# read raw data, repartition, convert to parquet for both file
raw_reff = dd.read_csv('data/a.csv')
raw_reff = raw_reff.map_partitions(lambda df: df.assign(PartGroup=df['Hier1']+df['Hier2']))
raw_reff = raw_reff.set_index('PartGroup')
raw_reff.to_parquet("data/raw_a.parquet")
cost_reff = dd.read_csv('data/b.csv')
cost_reff = cost_reff.map_partitions(lambda df: df.assign(PartGroup=df['Hier1']+df['Hier2']))
cost_reff = cost_reff.set_index('PartGroup')
cost_reff.to_parquet("data/raw_b.parquet")
# create reference ratio
ratio_reff = dd.read_parquet("data/raw_a.parquet").reset_index()
#to push down ram usage, instead of dask groupby im using groupby on each partition. Should be ok since its already partitioned above on each group
ratio_reff = ratio_reff.map_partitions(lambda df: df.groupby(['PartGroup'])['value'].sum().reset_index())
ratio_reff = ratio_reff.set_index('PartGroup')
ratio_reff = ratio_reff.map_partitions(lambda df: df.rename(columns={'value':'value_on_group'}))
ratio_reff.to_parquet("data/reff_a.parquet")
, а затем выполнить слияние, чтобы получить соотношение
raw_data = dd.read_parquet("data/raw_a.parquet").reset_index()
reff_data = dd.read_parquet("data/reff_a.parquet").reset_index()
ratio_data = raw_data.merge(reff_data, on=['PartGroup'], how='left')
ratio_data['RATIO'] = ratio_data['value'].fillna(0)/ratio_data['value_on_group'].fillna(0)
ratio_data = ratio_data[['PartGroup','Hier3','RATIO']]
ratio_data = ratio_data.set_index('PartGroup')
ratio_data.to_parquet("data/ratio_a.parquet")
, а затем объединить и умножить данные о затратах в PartGroup на Ratio, чтобы получить его пропорциональное значение
reff_stg = dd.read_parquet("data/ratio_a.parquet").reset_index()
cost_stg = dd.read_parquet("data/raw_b.parquet").reset_index()
final_stg = reff_stg.merge(cost_stg, on=['PartGroup'], how='left')
final_stg['allocated_cost'] = final_stg['RATIO']*final_stg['cost']
final_stg = final_stg.set_index('PartGroup')
final_stg.to_parquet("data/result_pass1.parquet")
, в реальном случае будет остаточная стоимость, вызванная отсутствием справочных данных et c и это будет сделано за несколько проходов с использованием нескольких ссылок, но в основном выше описан шаг
даже при работе строго с паркетом на паркет, он все еще занимает ~ 80 ГБ ОЗУ из моих 128 ГБ, все мои ядро работает на 100%, и 3-4 дня на запуск. Я ищу способы сделать это быстрее с современным оборудованием. Как вы можете видеть, это огромная проблема, которая вписывается в определение для обработки на основе GPU
Спасибо