Я прочитал несколько вопросов по предметам, а также некоторые учебные пособия, но не смог решить мою проблему, поэтому решил задать себе вопрос.
У меня есть большая коллекция больших файлов типов, скажем, A, B,C;и мне нужно оставить соединение B, C с A на некоторых условиях.Я работаю на удаленном сервере с 64 ЦП и 240 ГБ, поэтому, естественно, я бы хотел использовать его для параллельной работы и питания.Ключевое знание, которое у меня есть, заключается в том, что если файл a_i может быть успешно соединен только с b_i, b_ (i + 1) из B, то же самое для C. Моя первоначальная попытка состояла в том, чтобы иметь функцию 'join_i' для файла 'a_i', а затем запуститьэто параллельно (у меня 448 файлов).Тем не менее, не было никакого существенного улучшения времени, и фактически, когда я наблюдал за производительностью, к сожалению, загрузка процессора была очень низкой.Насколько я мог разобраться в проблеме, я думаю, что узким местом является IO, особенно потому что все файлы большие.Это правильная гипотеза?В любом случае, со второй попытки я решил последовательно просматривать каждый файл, но использовал параллельное преимущество в каждой итерации.Однако после многочисленных попыток мне здесь тоже не повезло.Я попытался сделать минимальный пример ниже, где параллель намного медленнее (и фактически на моих реальных данных она зависает).Что здесь не так?Это ошибка кода или какое-то более глубокое недопонимание того, как работает параллель в R?Кроме того, я попробовал несколько multidplyr и mclapply, но в обоих случаях тоже не повезло.Я также хочу отметить, что чтение файлов занимает больше, чем само объединение: в течение одной итерации чтение занимает около 30 секунд (я использую fread, разархивирую его внутри через cmd), в то время как объединение занимает около 10 секунд.Какова лучшая стратегия здесь, учитывая это?Заранее спасибо!
library(dplyr)
A=data.frame(cbind('a', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
B=data.frame(cbind('b', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
C=data.frame(cbind('c', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
chunk_join=function(i, A, B, C)
{
A_i=A %>% filter(X2==i)
B_i=B %>% filter(X2==i) %>% select(X1, X3)
C_i=C %>% filter(X2==i) %>% select(X1, X3)
join_i=A_i %>% left_join(B_i, by=c('X3')) %>% left_join(C_i, by=c('X3'))
}
library(parallel)
library(foreach)
cl = parallel::makeCluster(10)
doParallel::registerDoParallel(cl)
# not parallel
s1=Sys.time()
join1=data.frame()
join1 = foreach(j=1:10, .combine='rbind',
.packages=c('dplyr'),
.export=c('chunk_join','A', 'B', 'C')) %do%
{
join_i=chunk_join(j, A, B, C)
}
t1=Sys.time()-s1
colnames(join1)[4:5]=c('joinedB', 'joinedC')
r1=c(sum(!is.na(join1$joinedB)), sum(!is.na(join1$joinedC)))
# parallel
s2=Sys.time()
join2=data.frame()
join2 = foreach(j=1:10, .combine='rbind',
.packages=c('dplyr'),
.export=c('chunk_join','A', 'B', 'C')) %dopar%
{
join_i=chunk_join(j, A, B, C)
}
t2=Sys.time()-s2
stopCluster(cl)
colnames(join2)[4:5]=c('joinedB', 'joinedC')
r2=c(sum(!is.na(join2$joinedB)), sum(!is.na(join2$joinedC)))
R=rbind(r1, r2)
T=rbind(t1, t2)
R
T
На моем сервере это дает около 5 с для% do% и более 1 м для% dopar%.Обратите внимание, что это для объединения, даже без учета времени на создание кластеров.Кстати, кто-нибудь может также прокомментировать, сколько кластеров у меня будет?Скажем, я делю данные на X чанков четного размера и имею доступный Y CPU - я должен поставить Y - как можно больше, или X, или какое-то другое количество кластеров?