Я пытаюсь использовать Amazon Elastic Map Reduce для запуска серии симуляций нескольких миллионов случаев.Это потоковое задание Rscript без редуктора.Я использую Identity Reducer в своем вызове EMR --reducer org.apache.hadoop.mapred.lib.IdentityReducer
.
Файл сценария отлично работает при тестировании и запускается локально из командной строки в окне Linux при пропуске одной строки строки вручную echo "1,2443,2442,1,5" | ./mapper.R
, и я получаю одну ожидаемую строку результатов.Однако, когда я протестировал моё моделирование, используя около 10 000 наблюдений (строк) из входного файла EMR, я получил вывод только для дюжины строк или около того из 10k входных строк.Я пробовал несколько раз, и я не могу понять, почему.Задание Hadoop работает без ошибок.Кажется, что входные строки пропускаются, или, возможно, что-то происходит с редуктором идентификации.Результаты являются правильными для случаев, когда есть вывод.
Мой входной файл - CSV со следующим форматом данных, серия из пяти целых чисел, разделенных запятыми:
1,2443,2442,1,5
2,2743,4712,99,8
3,2443,861,282,3177
etc...
Здесьмой R скрипт для mapper.R
#! /usr/bin/env Rscript
# Define Functions
trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)
splitIntoWords <- function(line) unlist(strsplit(line, "[[:space:]]+"))
# function to read in the relevant data from needed data files
get.data <- function(casename) {
list <- lapply(casename, function(x) {
read.csv(file = paste("./inputdata/",x, ".csv", sep = ""),
header = TRUE,
stringsAsFactors = FALSE)})
return(data.frame(list))
}
con <- file("stdin")
line <- readLines(con, n = 1, warn = FALSE)
line <- trimWhiteSpace(line)
values <- unlist(strsplit(line, ","))
lv <- length(values)
cases <- as.numeric(values[2:lv])
simid <- paste("sim", values[1], ":", sep = "")
l <- length(cases) # for indexing
## create a vector for the case names
names.vector <- paste("case", cases, sep = ".")
## read in metadata and necessary data columns using get.data function
metadata <- read.csv(file = "./inputdata/metadata.csv", header = TRUE,
stringsAsFactors = FALSE)
d <- cbind(metadata[,1:3], get.data(names.vector))
## Calculations that use df d and produce a string called 'output'
## in the form of "id: value1 value2 value3 ..." to be used at a
## later time for agregation.
cat(output, "\n")
close(con)
(обобщенный) вызов EMR для этого моделирования:
ruby elastic-mapreduce --create --stream --input s3n://bucket/project/input.txt --output s3n://bucket/project/output --mapper s3n://bucket/project/mapper.R --reducer org.apache.hadoop.mapred.lib.IdentityReducer --cache-archive s3n://bucket/project/inputdata.tar.gz#inputdata --name Simulation --num-instances 2
Если у кого-то есть какие-либо идеи относительно того, почемуЯ могу столкнуться с этими проблемами, я открыт для предложений, а также для любых изменений / оптимизации скрипта R.
Другой вариант - превратить скрипт в функцию и запустить распараллеленное применение, используя многоядерные пакеты R, но я еще не пробовал.Я хотел бы заставить это работать на EMR.Я использовал JD Long's и Pete Skomoroch's R / EMR примеры в качестве основы для создания сценария.