GPU нижнего уровня против среднего процессора для обработки данных - PullRequest
0 голосов
/ 10 января 2020

В настоящее время у меня есть простая обработка данных, которая включает в себя группирование, объединение и параллельные операции столбца к столбцу. Не очень простая часть - это массивный используемый ряд (его подробные данные о стоимости / финансовых показателях). его размер составляет 300-400 ГБ.

из-за ограниченного объема ОЗУ, в настоящее время я использую неосновные вычисления с dask. Тем не менее, это действительно медленно.

Ранее я читал, используя CuDF для повышения производительности на map_partitions и groupby, однако большинство примеров используют GPU среднего класса (по крайней мере, 1050ti, большинство работает на облаке на основе gv). vm) и данные могут уместиться на ОЗУ GPU.

На моей машине установлены c E5-2620v3 (6C / 12T), 128 ГБ и K620 (только выделенный vram 2 ГБ).

Промежуточный используемый кадр данных хранится в паркете.

Будет ли это быстрее, если я использую низкоуровневый графический процессор с использованием CuDF? и возможно ли это сделать из базовых вычислений в графическом процессоре? (я смотрю вокруг, например, но еще не нашел)

Ниже приведен упрощенный псевдокод того, что я пытаюсь сделать

a.csv - это данные размером ~ 300 ГБ, состоящие из 3 столбцов (Hier1, Hier2, Hier3, значение). Hier1-3 - иерархия в виде строки. value - значение продаж. b.csv - данные размером ~ 50 ГБ, состоящие из 3 столбцов (Hier1, Hier2, valuetype, cost). Hier1-2 - это иерархия в виде строки. Тип значения - тип стоимости, в строке. себестоимость - это стоимостное значение

По сути, мне нужно сделать пропорциональную сверху вниз, основываясь на стоимости продаж из a.csv для каждой стоимости в b.csv. В конце концов, у меня есть каждая доступная стоимость на уровне Hier3 (более детальный уровень)

Первый шаг - создать пропорциональное соотношение:

import dask.dataframe as dd
# read raw data, repartition, convert to parquet for both file
raw_reff = dd.read_csv('data/a.csv')
raw_reff = raw_reff.map_partitions(lambda df: df.assign(PartGroup=df['Hier1']+df['Hier2']))
raw_reff = raw_reff.set_index('PartGroup')
raw_reff.to_parquet("data/raw_a.parquet")

cost_reff = dd.read_csv('data/b.csv')
cost_reff = cost_reff.map_partitions(lambda df: df.assign(PartGroup=df['Hier1']+df['Hier2']))
cost_reff = cost_reff.set_index('PartGroup')
cost_reff.to_parquet("data/raw_b.parquet")

# create reference ratio
ratio_reff = dd.read_parquet("data/raw_a.parquet").reset_index()

#to push down ram usage, instead of dask groupby im using groupby on each partition. Should be ok since its already partitioned above on each group

ratio_reff = ratio_reff.map_partitions(lambda df: df.groupby(['PartGroup'])['value'].sum().reset_index())
ratio_reff = ratio_reff.set_index('PartGroup')
ratio_reff = ratio_reff.map_partitions(lambda df: df.rename(columns={'value':'value_on_group'}))
ratio_reff.to_parquet("data/reff_a.parquet")

, а затем выполнить слияние, чтобы получить соотношение

raw_data = dd.read_parquet("data/raw_a.parquet").reset_index()
reff_data = dd.read_parquet("data/reff_a.parquet").reset_index()
ratio_data = raw_data.merge(reff_data, on=['PartGroup'], how='left')
ratio_data['RATIO'] = ratio_data['value'].fillna(0)/ratio_data['value_on_group'].fillna(0)
ratio_data = ratio_data[['PartGroup','Hier3','RATIO']]
ratio_data = ratio_data.set_index('PartGroup')
ratio_data.to_parquet("data/ratio_a.parquet")

, а затем объединить и умножить данные о затратах в PartGroup на Ratio, чтобы получить его пропорциональное значение

reff_stg = dd.read_parquet("data/ratio_a.parquet").reset_index()
cost_stg = dd.read_parquet("data/raw_b.parquet").reset_index()
final_stg = reff_stg.merge(cost_stg, on=['PartGroup'], how='left')
final_stg['allocated_cost'] = final_stg['RATIO']*final_stg['cost']
final_stg = final_stg.set_index('PartGroup')
final_stg.to_parquet("data/result_pass1.parquet")

, в реальном случае будет остаточная стоимость, вызванная отсутствием справочных данных et c и это будет сделано за несколько проходов с использованием нескольких ссылок, но в основном выше описан шаг

даже при работе строго с паркетом на паркет, он все еще занимает ~ 80 ГБ ОЗУ из моих 128 ГБ, все мои ядро работает на 100%, и 3-4 дня на запуск. Я ищу способы сделать это быстрее с современным оборудованием. Как вы можете видеть, это огромная проблема, которая вписывается в определение для обработки на основе GPU

Спасибо

1 Ответ

0 голосов
/ 21 февраля 2020

@ Так же, к сожалению, это невозможно сделать с вашим текущим оборудованием. Ваш K620 имеет графический процессор с архитектурой Kepler и соответствует минимальным требованиям для RAPIDS. Для запуска RAPIDS вам понадобится карта Pascal или выше. Хорошая новость заключается в том, что если покупка видеокарты, совместимой с RAPIDS, нецелесообразна, существует множество недорогих вариантов облачного обеспечения. Честно говоря, то, что вы просите сделать, я бы хотел немного увеличить скорость обработки на GPU и рекомендовал бы использовать настройку с несколькими GPU.

Что касается набора данных большего размера, чем RAM GPU, вы можете использовать dask_cudf, чтобы разрешить обработку вашего набора данных. В наших документах и ​​тетрадях есть несколько примеров. Обращаем ваше внимание, что результирующий набор данных после dask.compute () должен уместиться в ОЗУ графического процессора.

https://rapidsai.github.io/projects/cudf/en/0.12.0/10min.html#10 -Minutes-to-cuDF-and-Dask-cuDF

https://rapidsai.github.io/projects/cudf/en/0.12.0/dask-cudf.html#multi -gpu-with-dask-cudf

Как только вы сможете получить работающую, RAPID-совместимую мульти-GPU-установку и использовать dask_cudf, вы должен получить очень высокую скорость, особенно для такого размера исследования данных.

Надеюсь, это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...