R: Почему параллель (намного) медленнее?Какова наилучшая стратегия использования параллельного соединения (слева) для большой коллекции больших файлов? - PullRequest
2 голосов
/ 10 апреля 2019

Я прочитал несколько вопросов по предметам, а также некоторые учебные пособия, но не смог решить мою проблему, поэтому решил задать себе вопрос.

У меня есть большая коллекция больших файлов типов, скажем, A, B,C;и мне нужно оставить соединение B, C с A на некоторых условиях.Я работаю на удаленном сервере с 64 ЦП и 240 ГБ, поэтому, естественно, я бы хотел использовать его для параллельной работы и питания.Ключевое знание, которое у меня есть, заключается в том, что если файл a_i может быть успешно соединен только с b_i, b_ (i + 1) из B, то же самое для C. Моя первоначальная попытка состояла в том, чтобы иметь функцию 'join_i' для файла 'a_i', а затем запуститьэто параллельно (у меня 448 файлов).Тем не менее, не было никакого существенного улучшения времени, и фактически, когда я наблюдал за производительностью, к сожалению, загрузка процессора была очень низкой.Насколько я мог разобраться в проблеме, я думаю, что узким местом является IO, особенно потому что все файлы большие.Это правильная гипотеза?В любом случае, со второй попытки я решил последовательно просматривать каждый файл, но использовал параллельное преимущество в каждой итерации.Однако после многочисленных попыток мне здесь тоже не повезло.Я попытался сделать минимальный пример ниже, где параллель намного медленнее (и фактически на моих реальных данных она зависает).Что здесь не так?Это ошибка кода или какое-то более глубокое недопонимание того, как работает параллель в R?Кроме того, я попробовал несколько multidplyr и mclapply, но в обоих случаях тоже не повезло.Я также хочу отметить, что чтение файлов занимает больше, чем само объединение: в течение одной итерации чтение занимает около 30 секунд (я использую fread, разархивирую его внутри через cmd), в то время как объединение занимает около 10 секунд.Какова лучшая стратегия здесь, учитывая это?Заранее спасибо!

library(dplyr) 

A=data.frame(cbind('a', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
B=data.frame(cbind('b', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
C=data.frame(cbind('c', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)


chunk_join=function(i, A, B, C)
{
  A_i=A %>% filter(X2==i)
  B_i=B %>% filter(X2==i) %>% select(X1, X3)
  C_i=C %>% filter(X2==i) %>% select(X1, X3)
  join_i=A_i %>% left_join(B_i, by=c('X3')) %>% left_join(C_i, by=c('X3'))
}

library(parallel)
library(foreach)
cl = parallel::makeCluster(10)
doParallel::registerDoParallel(cl)

# not parallel 

s1=Sys.time()
join1=data.frame()
join1 = foreach(j=1:10, .combine='rbind', 
                .packages=c('dplyr'), 
                .export=c('chunk_join','A', 'B', 'C')) %do%
                {
                  join_i=chunk_join(j, A, B, C)
                }
t1=Sys.time()-s1
colnames(join1)[4:5]=c('joinedB', 'joinedC')
r1=c(sum(!is.na(join1$joinedB)), sum(!is.na(join1$joinedC)))

# parallel 
s2=Sys.time()
join2=data.frame()
join2 = foreach(j=1:10, .combine='rbind', 
                .packages=c('dplyr'), 
                .export=c('chunk_join','A', 'B', 'C')) %dopar%
                {
                  join_i=chunk_join(j, A, B, C)
                }
t2=Sys.time()-s2
stopCluster(cl)
colnames(join2)[4:5]=c('joinedB', 'joinedC')
r2=c(sum(!is.na(join2$joinedB)), sum(!is.na(join2$joinedC)))

R=rbind(r1, r2)
T=rbind(t1, t2)

R
T

На моем сервере это дает около 5 с для% do% и более 1 м для% dopar%.Обратите внимание, что это для объединения, даже без учета времени на создание кластеров.Кстати, кто-нибудь может также прокомментировать, сколько кластеров у меня будет?Скажем, я делю данные на X чанков четного размера и имею доступный Y CPU - я должен поставить Y - как можно больше, или X, или какое-то другое количество кластеров?

1 Ответ

2 голосов
/ 11 апреля 2019

Существует две проблемы, из-за которых ваша многопоточность медленная:

1) Передача данных в новые потоки 2) Передача данных из новых потоков обратно в основные потоки

Проблемы # 1 полностью исключаютсяиспользование mclapply, которое не копирует данные, если они не изменены, в системах Unix.(makeCluster по умолчанию использует сокеты для передачи данных).

Нельзя избежать проблемы # 2, используя mclapply, но вы можете минимизировать объем данных, передаваемых обратно в основной поток.

Наивный mclapply:

join3 = mclapply(1:10, function(j) {
  join_i=chunk_join(j, A, B, C)
}, mc.cores=4) %>% rbindlist

Немного умнее mclapply:

chunk_join2=function(i, A, B, C)
{
  A_i=A %>% filter(X2==i)
  B_i=B %>% filter(X2==i) %>% select(X1, X3)
  C_i=C %>% filter(X2==i) %>% select(X1, X3)
  join_i=A_i %>% left_join(B_i, by=c('X3')) %>% left_join(C_i, by=c('X3'))
  join_i[,c(-1,-2,-3)]
}
A <- arrange(A, X2)
join5 = mclapply(1:10, function(j) {
  join_i=chunk_join2(j, A, B, C)
}, mc.cores=4) %>% rbindlist
join5 <- cbind(A, join5)

Тесты:

Single threaded: 4.014s 

Naive mclapply: 1.860 s

Slightly smarter mclapply: 1.363 s

Если в ваших данных много столбцов, вы можете увидеть, как проблема № 2 полностью поглотит систему.Вы можете сделать еще лучше, например, возвращая индексы B и C вместо целого подмножества data.frame.

...